This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 946d89cb95 [Feature] Support catalog in MaxCompute Source (#5283)
946d89cb95 is described below
commit 946d89cb952571dbc90290a1027254a83f6dc5d9
Author: Wenjun Ruan <[email protected]>
AuthorDate: Wed Oct 25 18:15:31 2023 +0800
[Feature] Support catalog in MaxCompute Source (#5283)
---
docs/en/connector-v2/source/Maxcompute.md | 15 +
.../com/aliyun/odps/type/SimpleArrayTypeInfo.java | 52 ----
.../com/aliyun/odps/type/SimpleMapTypeInfo.java | 63 -----
.../com/aliyun/odps/type/SimpleStructTypeInfo.java | 102 -------
.../maxcompute/catalog/MaxComputeCatalog.java | 160 +++++++++++
.../MaxComputeCatalogFactory.java} | 26 +-
.../catalog/MaxComputeDataTypeConvertor.java | 302 +++++++++++++++++++++
.../maxcompute/config/MaxcomputeConfig.java | 3 +
.../seatunnel/maxcompute/sink/MaxcomputeSink.java | 4 +-
.../maxcompute/sink/MaxcomputeSinkFactory.java | 3 +-
.../maxcompute/source/MaxcomputeSource.java | 16 +-
.../maxcompute/source/MaxcomputeSourceFactory.java | 6 +-
.../maxcompute/util/MaxcomputeTypeMapper.java | 111 +-------
.../catalog/MaxComputeDataTypeConvertorTest.java | 71 +++++
.../maxcompute/source/MaxcomputeSourceTest.java | 48 ++++
15 files changed, 647 insertions(+), 335 deletions(-)
diff --git a/docs/en/connector-v2/source/Maxcompute.md
b/docs/en/connector-v2/source/Maxcompute.md
index f30be5a0d2..cb9bc32dd3 100644
--- a/docs/en/connector-v2/source/Maxcompute.md
+++ b/docs/en/connector-v2/source/Maxcompute.md
@@ -26,6 +26,7 @@ Used to read data from Maxcompute.
| partition_spec | string | no | - |
| split_row | int | no | 10000 |
| common-options | string | no | |
+| schema | config | no | |
### accessId [string]
@@ -59,6 +60,12 @@ Used to read data from Maxcompute.
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
+### schema [config]
+
+#### fields [Config]
+
+The schema information of upstream data.
+
## Examples
```hocon
@@ -71,6 +78,13 @@ source {
table_name="<your table name>"
#partition_spec="<your partition spec>"
#split_row = 10000
+ schema {
+ fields {
+ name = string
+ age = int
+ gender = string
+ }
+ }
}
}
```
@@ -80,4 +94,5 @@ source {
### next version
- [Feature] Add Maxcompute Source
Connector([3640](https://github.com/apache/seatunnel/pull/3640))
+- [Feature] Support Schema in MaxCompute
Source([3640](https://github.com/apache/seatunnel/pull/5283))
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleArrayTypeInfo.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleArrayTypeInfo.java
deleted file mode 100644
index 76fddc10c4..0000000000
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleArrayTypeInfo.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.aliyun.odps.type;
-
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
-
-import com.aliyun.odps.OdpsType;
-
-public class SimpleArrayTypeInfo implements ArrayTypeInfo {
- private final TypeInfo valueType;
-
- SimpleArrayTypeInfo(TypeInfo typeInfo) {
- if (typeInfo == null) {
- throw new MaxcomputeConnectorException(
- CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Invalid element
type.");
- } else {
- this.valueType = typeInfo;
- }
- }
-
- public String getTypeName() {
- return this.getOdpsType().name() + "<" + this.valueType.getTypeName()
+ ">";
- }
-
- public TypeInfo getElementTypeInfo() {
- return this.valueType;
- }
-
- public OdpsType getOdpsType() {
- return OdpsType.ARRAY;
- }
-
- public String toString() {
- return this.getTypeName();
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleMapTypeInfo.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleMapTypeInfo.java
deleted file mode 100644
index 6cedcd22bb..0000000000
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleMapTypeInfo.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.aliyun.odps.type;
-
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
-
-import com.aliyun.odps.OdpsType;
-
-public class SimpleMapTypeInfo implements MapTypeInfo {
- private final TypeInfo keyType;
- private final TypeInfo valueType;
-
- SimpleMapTypeInfo(TypeInfo keyType, TypeInfo valueType) {
- if (keyType != null && valueType != null) {
- this.keyType = keyType;
- this.valueType = valueType;
- } else {
- throw new MaxcomputeConnectorException(
- CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Invalid key or
value type for map.");
- }
- }
-
- public String getTypeName() {
- return this.getOdpsType().name()
- + "<"
- + this.keyType.getTypeName()
- + ","
- + this.valueType.getTypeName()
- + ">";
- }
-
- public TypeInfo getKeyTypeInfo() {
- return this.keyType;
- }
-
- public TypeInfo getValueTypeInfo() {
- return this.valueType;
- }
-
- public OdpsType getOdpsType() {
- return OdpsType.MAP;
- }
-
- public String toString() {
- return this.getTypeName();
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleStructTypeInfo.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleStructTypeInfo.java
deleted file mode 100644
index a628355221..0000000000
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/com/aliyun/odps/type/SimpleStructTypeInfo.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.aliyun.odps.type;
-
-import org.apache.seatunnel.common.exception.CommonErrorCode;
-import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
-
-import com.aliyun.odps.OdpsType;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-public class SimpleStructTypeInfo implements StructTypeInfo {
- private final List<String> fieldNames;
- private final List<TypeInfo> fieldTypeInfos;
-
- SimpleStructTypeInfo(List<String> names, List<TypeInfo> typeInfos) {
- this.validateParameters(names, typeInfos);
- this.fieldNames = this.toLowerCase(names);
- this.fieldTypeInfos = new ArrayList(typeInfos);
- }
-
- private List<String> toLowerCase(List<String> names) {
- List<String> lowerNames = new ArrayList(names.size());
- Iterator var3 = names.iterator();
-
- while (var3.hasNext()) {
- String name = (String) var3.next();
- lowerNames.add(name.toLowerCase());
- }
-
- return lowerNames;
- }
-
- private void validateParameters(List<String> names, List<TypeInfo>
typeInfos) {
- if (names != null && typeInfos != null && !names.isEmpty() &&
!typeInfos.isEmpty()) {
- if (names.size() != typeInfos.size()) {
- throw new MaxcomputeConnectorException(
- CommonErrorCode.UNSUPPORTED_DATA_TYPE,
- "The amount of field names must be equal to the amount
of field types.");
- }
- } else {
- throw new MaxcomputeConnectorException(
- CommonErrorCode.UNSUPPORTED_DATA_TYPE,
- "Invalid name or element type for struct.");
- }
- }
-
- public String getTypeName() {
- StringBuilder stringBuilder = new
StringBuilder(this.getOdpsType().name());
- stringBuilder.append("<");
-
- for (int i = 0; i < this.fieldNames.size(); ++i) {
- if (i > 0) {
- stringBuilder.append(",");
- }
-
- stringBuilder.append((String) this.fieldNames.get(i));
- stringBuilder.append(":");
- stringBuilder.append(((TypeInfo)
this.fieldTypeInfos.get(i)).getTypeName());
- }
-
- stringBuilder.append(">");
- return stringBuilder.toString();
- }
-
- public List<String> getFieldNames() {
- return this.fieldNames;
- }
-
- public List<TypeInfo> getFieldTypeInfos() {
- return this.fieldTypeInfos;
- }
-
- public int getFieldCount() {
- return this.fieldNames.size();
- }
-
- public OdpsType getOdpsType() {
- return OdpsType.STRUCT;
- }
-
- public String toString() {
- return this.getTypeName();
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
new file mode 100644
index 0000000000..b131277bd7
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalog.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.Projects;
+import com.aliyun.odps.Tables;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
+
+@Slf4j
+public class MaxComputeCatalog implements Catalog {
+
+ private final ReadonlyConfig readonlyConfig;
+ private final String catalogName;
+
+ private Account account;
+
+ public MaxComputeCatalog(String catalogName, ReadonlyConfig options) {
+ this.readonlyConfig = options;
+ this.catalogName = catalogName;
+ }
+
+ @Override
+ public void open() throws CatalogException {
+ account = new AliyunAccount(readonlyConfig.get(ACCESS_ID),
readonlyConfig.get(ACCESS_KEY));
+ }
+
+ @Override
+ public void close() throws CatalogException {}
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return readonlyConfig.get(PROJECT);
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws CatalogException
{
+ Odps odps = new Odps(account);
+ odps.setEndpoint(readonlyConfig.get(ENDPOINT));
+ odps.setDefaultProject(readonlyConfig.get(PROJECT));
+ Projects projects = odps.projects();
+ try {
+ return projects.exists(databaseName);
+ } catch (OdpsException e) {
+ throw new CatalogException("Check " + databaseName + " exist
error", e);
+ }
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ try {
+ // todo: how to get all projects
+ String project = readonlyConfig.get(PROJECT);
+ if (databaseExists(project)) {
+ return Lists.newArrayList(project);
+ }
+ return Collections.emptyList();
+ } catch (Exception e) {
+ throw new CatalogException("listDatabases exist error", e);
+ }
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ Odps odps = new Odps(account);
+ odps.setEndpoint(readonlyConfig.get(ENDPOINT));
+ odps.setDefaultProject(databaseName);
+
+ Tables tables = odps.tables();
+ List<String> tableNames = new ArrayList<>();
+ tables.forEach(
+ table -> {
+ tableNames.add(table.getName());
+ });
+ return tableNames;
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws CatalogException {
+ Odps odps = new Odps(account);
+ odps.setEndpoint(readonlyConfig.get(ENDPOINT));
+ odps.setDefaultProject(tablePath.getDatabaseName());
+
+ Tables tables = odps.tables();
+ try {
+ return tables.exists(tablePath.getTableName());
+ } catch (OdpsException e) {
+ throw new CatalogException("tableExists" + tablePath + " error",
e);
+ }
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table, boolean
ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
similarity index 73%
copy from
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
copy to
seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
index f5b4fd03b7..4ed29c0ea6 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogFactory.java
@@ -15,40 +15,44 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import com.google.auto.service.AutoService;
+import static org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
@AutoService(Factory.class)
-public class MaxcomputeSourceFactory implements TableSourceFactory {
+public class MaxComputeCatalogFactory implements CatalogFactory {
+
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ return new MaxComputeCatalog(catalogName, options);
+ }
+
@Override
public String factoryIdentifier() {
- return "Maxcompute";
+ return PLUGIN_NAME;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
- .optional(PARTITION_SPEC, SPLIT_ROW)
+ .optional(PARTITION_SPEC, SPLIT_ROW, SPLIT_ROW, SCHEMA)
.build();
}
-
- @Override
- public Class<? extends SeaTunnelSource> getSourceClass() {
- return MaxcomputeSource.class;
- }
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
new file mode 100644
index 0000000000..4e48f938aa
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertor.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;
+
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
+import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
+
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.type.ArrayTypeInfo;
+import com.aliyun.odps.type.DecimalTypeInfo;
+import com.aliyun.odps.type.MapTypeInfo;
+import com.aliyun.odps.type.StructTypeInfo;
+import com.aliyun.odps.type.TypeInfo;
+import com.aliyun.odps.type.TypeInfoFactory;
+import com.google.auto.service.AutoService;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@AutoService(DataTypeConvertor.class)
+public class MaxComputeDataTypeConvertor implements
DataTypeConvertor<TypeInfo> {
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
+ if (connectorDataType.startsWith("MAP")) {
+ // MAP<key,value>
+ int i = connectorDataType.indexOf(",");
+ return new MapType(
+ toSeaTunnelType(connectorDataType.substring(4, i)),
+ toSeaTunnelType(
+ connectorDataType.substring(i + 1,
connectorDataType.length() - 1)));
+ }
+ if (connectorDataType.startsWith("ARRAY")) {
+ // ARRAY<element>
+ SeaTunnelDataType<?> seaTunnelType =
+ toSeaTunnelType(connectorDataType.substring(6,
connectorDataType.length() - 1));
+ switch (seaTunnelType.getSqlType()) {
+ case STRING:
+ return ArrayType.STRING_ARRAY_TYPE;
+ case BOOLEAN:
+ return ArrayType.BOOLEAN_ARRAY_TYPE;
+ case BYTES:
+ return ArrayType.BYTE_ARRAY_TYPE;
+ case SMALLINT:
+ return ArrayType.SHORT_ARRAY_TYPE;
+ case INT:
+ return ArrayType.INT_ARRAY_TYPE;
+ case BIGINT:
+ return ArrayType.LONG_ARRAY_TYPE;
+ case FLOAT:
+ return ArrayType.FLOAT_ARRAY_TYPE;
+ case DOUBLE:
+ return ArrayType.DOUBLE_ARRAY_TYPE;
+ default:
+ throw new MaxcomputeConnectorException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ "Unsupported array element type: " +
seaTunnelType);
+ }
+ }
+ if (connectorDataType.startsWith("STRUCT")) {
+ // STRUCT<field1:type1,field2:type2...>
+ // todo: support struct type
+ String substring = connectorDataType.substring(7,
connectorDataType.length() - 1);
+ String[] entryArray = substring.split(",");
+ String[] fieldNames = new String[entryArray.length];
+ SeaTunnelDataType<?>[] fieldTypes = new
SeaTunnelDataType<?>[entryArray.length];
+ for (int i = 0; i < entryArray.length; i++) {
+ String[] field = entryArray[i].split(":");
+ fieldNames[i] = field[0];
+ fieldTypes[i] = toSeaTunnelType(field[1]);
+ }
+ return new SeaTunnelRowType(fieldNames, fieldTypes);
+ }
+ if (connectorDataType.startsWith("DECIMAL")) {
+ // DECIMAL(precision,scale)
+ if (connectorDataType.contains("(")) {
+ String substring = connectorDataType.substring(8,
connectorDataType.length() - 1);
+ String[] split = substring.split(",");
+ return new DecimalType(Integer.parseInt(split[0]),
Integer.parseInt(split[1]));
+ } else {
+ return new DecimalType(54, 18);
+ }
+ }
+ if (connectorDataType.startsWith("CHAR") ||
connectorDataType.startsWith("VARCHAR")) {
+ // CHAR(n) or VARCHAR(n)
+ return BasicType.STRING_TYPE;
+ }
+ switch (connectorDataType) {
+ case "TINYINT":
+ case "BINARY":
+ return BasicType.BYTE_TYPE;
+ case "SMALLINT":
+ return BasicType.SHORT_TYPE;
+ case "INT":
+ return BasicType.INT_TYPE;
+ case "BIGINT":
+ return BasicType.LONG_TYPE;
+ case "FLOAT":
+ return BasicType.FLOAT_TYPE;
+ case "DOUBLE":
+ return BasicType.DOUBLE_TYPE;
+ case "STRING":
+ return BasicType.STRING_TYPE;
+ case "DATE":
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case "TIMESTAMP":
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case "TIME":
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case "BOOLEAN":
+ return DecimalType.BOOLEAN_TYPE;
+ case "NULL":
+ return BasicType.VOID_TYPE;
+ default:
+ throw new MaxcomputeConnectorException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ String.format(
+ "SeaTunnel type not support this type [%s]
now",
+ connectorDataType));
+ }
+ }
+
+ @Override
+ public SeaTunnelDataType<?> toSeaTunnelType(
+ TypeInfo connectorDataType, Map<String, Object> dataTypeProperties)
+ throws DataTypeConvertException {
+ switch (connectorDataType.getOdpsType()) {
+ case MAP:
+ MapTypeInfo mapTypeInfo = (MapTypeInfo) connectorDataType;
+ return new MapType(
+ toSeaTunnelType(mapTypeInfo.getKeyTypeInfo(),
dataTypeProperties),
+ toSeaTunnelType(mapTypeInfo.getValueTypeInfo(),
dataTypeProperties));
+ case ARRAY:
+ ArrayTypeInfo arrayTypeInfo = (ArrayTypeInfo)
connectorDataType;
+ switch (arrayTypeInfo.getElementTypeInfo().getOdpsType()) {
+ case BOOLEAN:
+ return ArrayType.BOOLEAN_ARRAY_TYPE;
+ case INT:
+ return ArrayType.INT_ARRAY_TYPE;
+ case BIGINT:
+ return ArrayType.LONG_ARRAY_TYPE;
+ case FLOAT:
+ return ArrayType.FLOAT_ARRAY_TYPE;
+ case DOUBLE:
+ return ArrayType.DOUBLE_ARRAY_TYPE;
+ case STRING:
+ return ArrayType.STRING_ARRAY_TYPE;
+ default:
+ throw new MaxcomputeConnectorException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ String.format(
+ "SeaTunnel type not support this type
[%s] now",
+ connectorDataType.getTypeName()));
+ }
+ case STRUCT:
+ StructTypeInfo structTypeInfo = (StructTypeInfo)
connectorDataType;
+ List<TypeInfo> fields = structTypeInfo.getFieldTypeInfos();
+ List<String> fieldNames = new ArrayList<>(fields.size());
+ List<SeaTunnelDataType<?>> fieldTypes = new
ArrayList<>(fields.size());
+ for (TypeInfo field : fields) {
+ fieldNames.add(field.getTypeName());
+ fieldTypes.add(toSeaTunnelType(field, dataTypeProperties));
+ }
+ return new SeaTunnelRowType(
+ fieldNames.toArray(new String[0]),
+ fieldTypes.toArray(new SeaTunnelDataType[0]));
+ case TINYINT:
+ return BasicType.BYTE_TYPE;
+ case SMALLINT:
+ return BasicType.SHORT_TYPE;
+ case INT:
+ return BasicType.INT_TYPE;
+ case BIGINT:
+ return BasicType.LONG_TYPE;
+ case BINARY:
+ return PrimitiveByteArrayType.INSTANCE;
+ case FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+ case DECIMAL:
+ DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo)
connectorDataType;
+ return new DecimalType(decimalTypeInfo.getPrecision(),
decimalTypeInfo.getScale());
+ case VARCHAR:
+ case CHAR:
+ case STRING:
+ return BasicType.STRING_TYPE;
+ case DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ case DATETIME:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case TIMESTAMP:
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ case BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case VOID:
+ return BasicType.VOID_TYPE;
+ case INTERVAL_DAY_TIME:
+ case INTERVAL_YEAR_MONTH:
+ default:
+ throw new MaxcomputeConnectorException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ String.format(
+ "SeaTunnel type not support this type [%s]
now",
+ connectorDataType.getTypeName()));
+ }
+ }
+
+ @Override
+ public TypeInfo toConnectorType(
+ SeaTunnelDataType<?> seaTunnelDataType, Map<String, Object>
dataTypeProperties)
+ throws DataTypeConvertException {
+ switch (seaTunnelDataType.getSqlType()) {
+ case MAP:
+ MapType mapType = (MapType) seaTunnelDataType;
+ return TypeInfoFactory.getMapTypeInfo(
+ toConnectorType(mapType.getKeyType(),
dataTypeProperties),
+ toConnectorType(mapType.getValueType(),
dataTypeProperties));
+ case ARRAY:
+ ArrayType arrayType = (ArrayType) seaTunnelDataType;
+ return TypeInfoFactory.getArrayTypeInfo(
+ toConnectorType(arrayType.getElementType(),
dataTypeProperties));
+ case ROW:
+ SeaTunnelRowType rowType = (SeaTunnelRowType)
seaTunnelDataType;
+ List<String> fieldNames = new
ArrayList<>(rowType.getTotalFields());
+ List<TypeInfo> fieldTypes = new
ArrayList<>(rowType.getTotalFields());
+ for (int i = 0; i < rowType.getTotalFields(); i++) {
+ fieldNames.add(rowType.getFieldName(i));
+ fieldTypes.add(toConnectorType(rowType.getFieldType(i),
dataTypeProperties));
+ }
+ return TypeInfoFactory.getStructTypeInfo(fieldNames,
fieldTypes);
+ case TINYINT:
+ return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TINYINT);
+ case SMALLINT:
+ return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.SMALLINT);
+ case INT:
+ return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.INT);
+ case BIGINT:
+ return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BIGINT);
+ case BYTES:
+ return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BINARY);
+ case FLOAT:
+ return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.FLOAT);
+ case DOUBLE:
+ return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DOUBLE);
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) seaTunnelDataType;
+ return TypeInfoFactory.getDecimalTypeInfo(
+ decimalType.getPrecision(), decimalType.getScale());
+ case STRING:
+ return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.STRING);
+ case DATE:
+ return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATE);
+ case TIMESTAMP:
+ return
TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.TIMESTAMP);
+ case TIME:
+ return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.DATETIME);
+ case BOOLEAN:
+ return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.BOOLEAN);
+ case NULL:
+ return TypeInfoFactory.getPrimitiveTypeInfo(OdpsType.VOID);
+ default:
+ throw new MaxcomputeConnectorException(
+ CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+ String.format(
+ "Maxcompute type not support this type [%s]
now",
+ seaTunnelDataType.getSqlType()));
+ }
+ }
+
+ @Override
+ public String getIdentity() {
+ return MaxcomputeConfig.PLUGIN_NAME;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
index 18aaafad41..84bbccddb7 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java
@@ -23,6 +23,9 @@ import org.apache.seatunnel.api.configuration.Options;
import java.io.Serializable;
public class MaxcomputeConfig implements Serializable {
+
+ public static final String PLUGIN_NAME = "Maxcompute";
+
private static final int SPLIT_ROW_DEFAULT = 10000;
public static final Option<String> ACCESS_ID =
Options.key("accessId")
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
index cf6d639ed1..31c5ab0791 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSink.java
@@ -34,6 +34,8 @@ import org.slf4j.LoggerFactory;
import com.google.auto.service.AutoService;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
+
@AutoService(SeaTunnelSink.class)
public class MaxcomputeSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
private static final Logger LOG =
LoggerFactory.getLogger(MaxcomputeSink.class);
@@ -42,7 +44,7 @@ public class MaxcomputeSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public String getPluginName() {
- return "Maxcompute";
+ return PLUGIN_NAME;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
index 5fbf1608d3..faa2f62910 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeSinkFactory.java
@@ -28,6 +28,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.Maxcom
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.OVERWRITE;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
@@ -35,7 +36,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.Maxcom
public class MaxcomputeSinkFactory implements TableSinkFactory {
@Override
public String factoryIdentifier() {
- return "Maxcompute";
+ return PLUGIN_NAME;
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
index 3f6dd6cc76..417d660b86 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
@@ -23,7 +23,9 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeMapper;
@@ -31,22 +33,30 @@ import
org.apache.seatunnel.connectors.seatunnel.maxcompute.util.MaxcomputeTypeM
import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
+import static org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
+
@Slf4j
@AutoService(SeaTunnelSource.class)
public class MaxcomputeSource
implements SeaTunnelSource<SeaTunnelRow, MaxcomputeSourceSplit,
MaxcomputeSourceState>,
- SupportParallelism {
+ SupportParallelism,
+ SupportColumnProjection {
private SeaTunnelRowType typeInfo;
private Config pluginConfig;
@Override
public String getPluginName() {
- return "Maxcompute";
+ return PLUGIN_NAME;
}
@Override
public void prepare(Config pluginConfig) {
- this.typeInfo = MaxcomputeTypeMapper.getSeaTunnelRowType(pluginConfig);
+ if (pluginConfig.hasPath(SCHEMA.key())) {
+ this.typeInfo =
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
+ } else {
+ this.typeInfo =
MaxcomputeTypeMapper.getSeaTunnelRowType(pluginConfig);
+ }
this.pluginConfig = pluginConfig;
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
index f5b4fd03b7..a1600f106d 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceFactory.java
@@ -24,10 +24,12 @@ import
org.apache.seatunnel.api.table.factory.TableSourceFactory;
import com.google.auto.service.AutoService;
+import static org.apache.seatunnel.api.table.catalog.CatalogTableUtil.SCHEMA;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_ID;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ACCESS_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.ENDPOINT;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PARTITION_SPEC;
+import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PLUGIN_NAME;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.PROJECT;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.SPLIT_ROW;
import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig.TABLE_NAME;
@@ -36,14 +38,14 @@ import static
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.Maxcom
public class MaxcomputeSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
- return "Maxcompute";
+ return PLUGIN_NAME;
}
@Override
public OptionRule optionRule() {
return OptionRule.builder()
.required(ACCESS_ID, ACCESS_KEY, ENDPOINT, PROJECT, TABLE_NAME)
- .optional(PARTITION_SPEC, SPLIT_ROW)
+ .optional(PARTITION_SPEC, SPLIT_ROW, SCHEMA)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
index ac77214015..8e9eaf0785 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/util/MaxcomputeTypeMapper.java
@@ -20,15 +20,12 @@ package
org.apache.seatunnel.connectors.seatunnel.maxcompute.util;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.table.type.ArrayType;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeDataTypeConvertor;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException;
import com.aliyun.odps.Column;
@@ -41,11 +38,7 @@ import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.SimpleStruct;
import com.aliyun.odps.data.Varchar;
import com.aliyun.odps.type.ArrayTypeInfo;
-import com.aliyun.odps.type.DecimalTypeInfo;
import com.aliyun.odps.type.MapTypeInfo;
-import com.aliyun.odps.type.SimpleArrayTypeInfo;
-import com.aliyun.odps.type.SimpleMapTypeInfo;
-import com.aliyun.odps.type.SimpleStructTypeInfo;
import com.aliyun.odps.type.StructTypeInfo;
import com.aliyun.odps.type.TypeInfo;
import lombok.extern.slf4j.Slf4j;
@@ -67,9 +60,9 @@ public class MaxcomputeTypeMapper implements Serializable {
public static SeaTunnelRow getSeaTunnelRowData(Record rs, SeaTunnelRowType
typeInfo) {
List<Object> fields = new ArrayList<>();
- SeaTunnelDataType<?>[] seaTunnelDataTypes = typeInfo.getFieldTypes();
- for (int i = 0; i < rs.getColumns().length; i++) {
- fields.add(resolveObject2SeaTunnel(rs.get(i),
seaTunnelDataTypes[i]));
+ for (int i = 0; i < typeInfo.getTotalFields(); i++) {
+ String typeName = typeInfo.getFieldName(i);
+ fields.add(resolveObject2SeaTunnel(rs.get(typeName),
typeInfo.getFieldType(i)));
}
return new SeaTunnelRow(fields.toArray());
}
@@ -92,11 +85,12 @@ public class MaxcomputeTypeMapper implements Serializable {
ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
ArrayList<String> fieldNames = new ArrayList<>();
try {
+ MaxComputeDataTypeConvertor typeConvertor = new
MaxComputeDataTypeConvertor();
for (int i = 0; i < tableSchema.getColumns().size(); i++) {
fieldNames.add(tableSchema.getColumns().get(i).getName());
TypeInfo maxcomputeTypeInfo =
tableSchema.getColumns().get(i).getTypeInfo();
SeaTunnelDataType<?> seaTunnelDataType =
- maxcompute2SeaTunnelType(maxcomputeTypeInfo);
+ typeConvertor.toSeaTunnelType(maxcomputeTypeInfo,
null);
seaTunnelDataTypes.add(seaTunnelDataType);
}
} catch (Exception e) {
@@ -107,88 +101,6 @@ public class MaxcomputeTypeMapper implements Serializable {
seaTunnelDataTypes.toArray(new
SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
}
- private static SeaTunnelDataType<?> maxcompute2SeaTunnelType(TypeInfo
typeInfo) {
- switch (typeInfo.getOdpsType()) {
- case MAP:
- MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
- return new MapType(
- maxcompute2SeaTunnelType(mapTypeInfo.getKeyTypeInfo()),
-
maxcompute2SeaTunnelType(mapTypeInfo.getValueTypeInfo()));
- case ARRAY:
- ArrayTypeInfo arrayTypeInfo = (ArrayTypeInfo) typeInfo;
- switch (arrayTypeInfo.getElementTypeInfo().getOdpsType()) {
- case BOOLEAN:
- return ArrayType.BOOLEAN_ARRAY_TYPE;
- case INT:
- return ArrayType.INT_ARRAY_TYPE;
- case BIGINT:
- return ArrayType.LONG_ARRAY_TYPE;
- case FLOAT:
- return ArrayType.FLOAT_ARRAY_TYPE;
- case DOUBLE:
- return ArrayType.DOUBLE_ARRAY_TYPE;
- case STRING:
- return ArrayType.STRING_ARRAY_TYPE;
- default:
- throw new MaxcomputeConnectorException(
- CommonErrorCode.UNSUPPORTED_DATA_TYPE,
- String.format(
- "SeaTunnel type not support this type
[%s] now",
- typeInfo.getTypeName()));
- }
- case STRUCT:
- StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo;
- List<TypeInfo> fields = structTypeInfo.getFieldTypeInfos();
- List<String> fieldNames = new ArrayList<>(fields.size());
- List<SeaTunnelDataType<?>> fieldTypes = new
ArrayList<>(fields.size());
- for (TypeInfo field : fields) {
- fieldNames.add(field.getTypeName());
- fieldTypes.add(maxcompute2SeaTunnelType(field));
- }
- return new SeaTunnelRowType(
- fieldNames.toArray(new String[0]),
- fieldTypes.toArray(new SeaTunnelDataType[0]));
- case TINYINT:
- return BasicType.BYTE_TYPE;
- case SMALLINT:
- return BasicType.SHORT_TYPE;
- case INT:
- return BasicType.INT_TYPE;
- case BIGINT:
- return BasicType.LONG_TYPE;
- case BINARY:
- return PrimitiveByteArrayType.INSTANCE;
- case FLOAT:
- return BasicType.FLOAT_TYPE;
- case DOUBLE:
- return BasicType.DOUBLE_TYPE;
- case DECIMAL:
- DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) typeInfo;
- return new DecimalType(decimalTypeInfo.getPrecision(),
decimalTypeInfo.getScale());
- case VARCHAR:
- case CHAR:
- case STRING:
- return BasicType.STRING_TYPE;
- case DATE:
- return LocalTimeType.LOCAL_DATE_TYPE;
- case DATETIME:
- case TIMESTAMP:
- return LocalTimeType.LOCAL_DATE_TIME_TYPE;
- case BOOLEAN:
- return BasicType.BOOLEAN_TYPE;
- case VOID:
- return BasicType.VOID_TYPE;
- case INTERVAL_DAY_TIME:
- case INTERVAL_YEAR_MONTH:
- default:
- throw new MaxcomputeConnectorException(
- CommonErrorCode.UNSUPPORTED_DATA_TYPE,
- String.format(
- "SeaTunnel type not support this type [%s]
now",
- typeInfo.getTypeName()));
- }
- }
-
private static Object resolveObject2SeaTunnel(Object field,
SeaTunnelDataType<?> fieldType) {
if (field == null) {
return null;
@@ -245,11 +157,10 @@ public class MaxcomputeTypeMapper implements Serializable
{
case DOUBLE:
case BIGINT:
case BOOLEAN:
+ case DECIMAL:
return field;
case BYTES:
return ((Binary) field).data();
- case DECIMAL:
- return null;
case STRING:
if (field instanceof byte[]) {
return new String((byte[]) field);
@@ -288,7 +199,7 @@ public class MaxcomputeTypeMapper implements Serializable {
case ARRAY:
ArrayList<Object> origArray = new ArrayList<>();
Arrays.stream((Object[])
field).iterator().forEachRemaining(origArray::add);
- switch (((SimpleArrayTypeInfo)
typeInfo).getElementTypeInfo().getOdpsType()) {
+ switch (((ArrayTypeInfo)
typeInfo).getElementTypeInfo().getOdpsType()) {
case STRING:
case BOOLEAN:
case INT:
@@ -305,8 +216,8 @@ public class MaxcomputeTypeMapper implements Serializable {
}
case MAP:
HashMap<Object, Object> dataMap = new HashMap<>();
- TypeInfo keyTypeInfo = ((SimpleMapTypeInfo)
typeInfo).getKeyTypeInfo();
- TypeInfo valueTypeInfo = ((SimpleMapTypeInfo)
typeInfo).getValueTypeInfo();
+ TypeInfo keyTypeInfo = ((MapTypeInfo)
typeInfo).getKeyTypeInfo();
+ TypeInfo valueTypeInfo = ((MapTypeInfo)
typeInfo).getValueTypeInfo();
HashMap<Object, Object> origDataMap = (HashMap<Object,
Object>) field;
origDataMap.forEach(
(key, value) ->
@@ -316,7 +227,7 @@ public class MaxcomputeTypeMapper implements Serializable {
return origDataMap;
case STRUCT:
Object[] fields = ((SeaTunnelRow) field).getFields();
- List<TypeInfo> typeInfos = ((SimpleStructTypeInfo)
typeInfo).getFieldTypeInfos();
+ List<TypeInfo> typeInfos = ((StructTypeInfo)
typeInfo).getFieldTypeInfos();
ArrayList<Object> origStruct = new ArrayList<>();
for (int i = 0; i < fields.length; i++) {
origStruct.add(resolveObject2Maxcompute(fields[i],
typeInfos.get(i)));
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
new file mode 100644
index 0000000000..0af30301ca
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeDataTypeConvertorTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import
org.apache.seatunnel.connectors.seatunnel.maxcompute.config.MaxcomputeConfig;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.aliyun.odps.OdpsType;
+import com.aliyun.odps.type.MapTypeInfo;
+import com.aliyun.odps.type.TypeInfoFactory;
+import com.aliyun.odps.type.VarcharTypeInfo;
+
+public class MaxComputeDataTypeConvertorTest {
+
+ private final MaxComputeDataTypeConvertor maxComputeDataTypeConvertor =
+ new MaxComputeDataTypeConvertor();
+
+ @Test
+ public void testTypeInfoStrToSeaTunnelType() {
+ String typeInfoStr = "MAP<STRING,STRING>";
+ SeaTunnelDataType<?> seaTunnelType =
+ maxComputeDataTypeConvertor.toSeaTunnelType(typeInfoStr);
+ Assertions.assertEquals(BasicType.STRING_TYPE, ((MapType)
seaTunnelType).getKeyType());
+ Assertions.assertEquals(BasicType.STRING_TYPE, ((MapType)
seaTunnelType).getKeyType());
+ }
+
+ @Test
+ public void testTypeInfoToSeaTunnelType() {
+ MapTypeInfo simpleMapTypeInfo =
+ TypeInfoFactory.getMapTypeInfo(new VarcharTypeInfo(10), new
VarcharTypeInfo(10));
+ MapType seaTunnelMapType =
+ (MapType)
maxComputeDataTypeConvertor.toSeaTunnelType(simpleMapTypeInfo, null);
+ Assertions.assertEquals(BasicType.STRING_TYPE,
seaTunnelMapType.getKeyType());
+ Assertions.assertEquals(BasicType.STRING_TYPE,
seaTunnelMapType.getValueType());
+ }
+
+ @Test
+ public void testSeaTunnelTypeToTypeInfo() {
+ MapType mapType = new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE);
+ MapTypeInfo mapTypeInfo =
+ (MapTypeInfo)
maxComputeDataTypeConvertor.toConnectorType(mapType, null);
+ Assertions.assertEquals(OdpsType.STRING,
mapTypeInfo.getKeyTypeInfo().getOdpsType());
+ Assertions.assertEquals(OdpsType.STRING,
mapTypeInfo.getValueTypeInfo().getOdpsType());
+ }
+
+ @Test
+ public void getIdentity() {
+ Assertions.assertEquals(
+ MaxcomputeConfig.PLUGIN_NAME,
maxComputeDataTypeConvertor.getIdentity());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java
new file mode 100644
index 0000000000..9d394775d1
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.maxcompute.source;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class MaxcomputeSourceTest {
+
+ @Test
+ public void prepare() {
+ Config fields =
+ ConfigFactory.empty()
+ .withValue("id", ConfigValueFactory.fromAnyRef("int"))
+ .withValue("name",
ConfigValueFactory.fromAnyRef("string"))
+ .withValue("age",
ConfigValueFactory.fromAnyRef("int"));
+
+ Config schema = fields.atKey("fields").atKey("schema");
+
+ MaxcomputeSource maxcomputeSource = new MaxcomputeSource();
+ Assertions.assertDoesNotThrow(() -> maxcomputeSource.prepare(schema));
+
+ SeaTunnelRowType seaTunnelRowType = maxcomputeSource.getProducedType();
+ Assertions.assertEquals(SqlType.INT,
seaTunnelRowType.getFieldType(0).getSqlType());
+ }
+}