This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a24fa8fef6 [Fix][Connector-V2] Fix MaxCompute cannot get project and
tableName when use schema (#8865)
a24fa8fef6 is described below
commit a24fa8fef6a9ea45f985db2d1f5b77ff18748ece
Author: Jia Fan <[email protected]>
AuthorDate: Sat Mar 1 21:42:58 2025 +0800
[Fix][Connector-V2] Fix MaxCompute cannot get project and tableName when
use schema (#8865)
---
.../maxcompute/source/MaxcomputeSource.java | 24 +++++++++---
.../maxcompute/source/MaxcomputeSourceTest.java | 43 ++++++++++++++++++++--
2 files changed, 57 insertions(+), 10 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
index ddbf3dfa56..824585105a 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
@@ -27,6 +27,7 @@ import
org.apache.seatunnel.api.source.SupportColumnProjection;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;
@@ -69,6 +70,13 @@ public class MaxcomputeSource
if
(readonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(readonlyConfig);
+ catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of(
+ "maxcompute",
+ readonlyConfig.get(PROJECT),
+ readonlyConfig.get(TABLE_NAME)),
+ catalogTable);
tables.put(
catalogTable.getTablePath(),
new SourceTableInfo(
@@ -81,11 +89,21 @@ public class MaxcomputeSource
if (readonlyConfig.getOptional(TABLE_LIST).isPresent()) {
for (Map<String, Object> subConfig :
readonlyConfig.get(TABLE_LIST)) {
ReadonlyConfig subReadonlyConfig =
ReadonlyConfig.fromMap(subConfig);
+ String project =
+ subReadonlyConfig
+ .getOptional(PROJECT)
+ .orElse(readonlyConfig.get(PROJECT));
+ TablePath tablePath =
+ TablePath.of(project,
subReadonlyConfig.get(TABLE_NAME));
if (subReadonlyConfig
.getOptional(ConnectorCommonOptions.SCHEMA)
.isPresent()) {
CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(subReadonlyConfig);
+ catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of("maxcompute",
tablePath),
+ catalogTable);
tables.put(
catalogTable.getTablePath(),
new SourceTableInfo(
@@ -93,16 +111,10 @@ public class MaxcomputeSource
subReadonlyConfig.get(PARTITION_SPEC),
subReadonlyConfig.get(SPLIT_ROW)));
} else {
- String project =
- subReadonlyConfig
- .getOptional(PROJECT)
-
.orElse(readonlyConfig.get(PROJECT));
Integer splitRow =
subReadonlyConfig
.getOptional(SPLIT_ROW)
.orElse(readonlyConfig.get(SPLIT_ROW));
- TablePath tablePath =
- TablePath.of(project,
subReadonlyConfig.get(TABLE_NAME));
tables.put(
tablePath,
new SourceTableInfo(
diff --git
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java
index 354b3c52d0..ede4f9dca4 100644
---
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java
+++
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java
@@ -22,16 +22,21 @@ import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
public class MaxcomputeSourceTest {
@Test
- public void prepare() {
+ public void testParseSchema() {
Config fields =
ConfigFactory.empty()
.withValue("id", ConfigValueFactory.fromAnyRef("int"))
@@ -40,10 +45,40 @@ public class MaxcomputeSourceTest {
Config schema = fields.atKey("fields").atKey("schema");
- MaxcomputeSource maxcomputeSource = new
MaxcomputeSource(ReadonlyConfig.fromConfig(schema));
+ Config root =
+ schema.withValue("project",
ConfigValueFactory.fromAnyRef("project"))
+ .withValue("table_name",
ConfigValueFactory.fromAnyRef("test_table"));
+
+ MaxcomputeSource maxcomputeSource = new
MaxcomputeSource(ReadonlyConfig.fromConfig(root));
- SeaTunnelRowType seaTunnelRowType =
-
maxcomputeSource.getProducedCatalogTables().get(0).getSeaTunnelRowType();
+ CatalogTable table =
maxcomputeSource.getProducedCatalogTables().get(0);
+ Assertions.assertEquals("project.test_table",
table.getTablePath().toString());
+ SeaTunnelRowType seaTunnelRowType = table.getSeaTunnelRowType();
Assertions.assertEquals(SqlType.INT,
seaTunnelRowType.getFieldType(0).getSqlType());
+
+ Map<String, Object> tableList = new HashMap<>();
+ Map<String, Object> schemaMap = new HashMap<>();
+ Map<String, Object> fieldsMap = new HashMap<>();
+ fieldsMap.put("id", "int");
+ fieldsMap.put("name", "string");
+ fieldsMap.put("age", "int");
+ schemaMap.put("fields", fieldsMap);
+ tableList.put("schema", schemaMap);
+ tableList.put("table_name", "test_table2");
+
+ root =
+ ConfigFactory.empty()
+ .withValue("project",
ConfigValueFactory.fromAnyRef("project"))
+ .withValue("accessId",
ConfigValueFactory.fromAnyRef("accessId"))
+ .withValue("accesskey",
ConfigValueFactory.fromAnyRef("accessKey"))
+ .withValue(
+ "table_list",
+ ConfigValueFactory.fromIterable(
+ Collections.singletonList(tableList)));
+
+ maxcomputeSource = new
MaxcomputeSource(ReadonlyConfig.fromConfig(root));
+
+ table = maxcomputeSource.getProducedCatalogTables().get(0);
+ Assertions.assertEquals("project.test_table2",
table.getTablePath().toString());
}
}