This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a24fa8fef6 [Fix][Connector-V2] Fix MaxCompute cannot get project and 
tableName when use schema (#8865)
a24fa8fef6 is described below

commit a24fa8fef6a9ea45f985db2d1f5b77ff18748ece
Author: Jia Fan <[email protected]>
AuthorDate: Sat Mar 1 21:42:58 2025 +0800

    [Fix][Connector-V2] Fix MaxCompute cannot get project and tableName when 
use schema (#8865)
---
 .../maxcompute/source/MaxcomputeSource.java        | 24 +++++++++---
 .../maxcompute/source/MaxcomputeSourceTest.java    | 43 ++++++++++++++++++++--
 2 files changed, 57 insertions(+), 10 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
index ddbf3dfa56..824585105a 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSource.java
@@ -27,6 +27,7 @@ import 
org.apache.seatunnel.api.source.SupportColumnProjection;
 import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import 
org.apache.seatunnel.connectors.seatunnel.maxcompute.catalog.MaxComputeCatalog;
@@ -69,6 +70,13 @@ public class MaxcomputeSource
 
         if 
(readonlyConfig.getOptional(ConnectorCommonOptions.SCHEMA).isPresent()) {
             CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(readonlyConfig);
+            catalogTable =
+                    CatalogTable.of(
+                            TableIdentifier.of(
+                                    "maxcompute",
+                                    readonlyConfig.get(PROJECT),
+                                    readonlyConfig.get(TABLE_NAME)),
+                            catalogTable);
             tables.put(
                     catalogTable.getTablePath(),
                     new SourceTableInfo(
@@ -81,11 +89,21 @@ public class MaxcomputeSource
                 if (readonlyConfig.getOptional(TABLE_LIST).isPresent()) {
                     for (Map<String, Object> subConfig : 
readonlyConfig.get(TABLE_LIST)) {
                         ReadonlyConfig subReadonlyConfig = 
ReadonlyConfig.fromMap(subConfig);
+                        String project =
+                                subReadonlyConfig
+                                        .getOptional(PROJECT)
+                                        .orElse(readonlyConfig.get(PROJECT));
+                        TablePath tablePath =
+                                TablePath.of(project, 
subReadonlyConfig.get(TABLE_NAME));
                         if (subReadonlyConfig
                                 .getOptional(ConnectorCommonOptions.SCHEMA)
                                 .isPresent()) {
                             CatalogTable catalogTable =
                                     
CatalogTableUtil.buildWithConfig(subReadonlyConfig);
+                            catalogTable =
+                                    CatalogTable.of(
+                                            TableIdentifier.of("maxcompute", 
tablePath),
+                                            catalogTable);
                             tables.put(
                                     catalogTable.getTablePath(),
                                     new SourceTableInfo(
@@ -93,16 +111,10 @@ public class MaxcomputeSource
                                             
subReadonlyConfig.get(PARTITION_SPEC),
                                             subReadonlyConfig.get(SPLIT_ROW)));
                         } else {
-                            String project =
-                                    subReadonlyConfig
-                                            .getOptional(PROJECT)
-                                            
.orElse(readonlyConfig.get(PROJECT));
                             Integer splitRow =
                                     subReadonlyConfig
                                             .getOptional(SPLIT_ROW)
                                             
.orElse(readonlyConfig.get(SPLIT_ROW));
-                            TablePath tablePath =
-                                    TablePath.of(project, 
subReadonlyConfig.get(TABLE_NAME));
                             tables.put(
                                     tablePath,
                                     new SourceTableInfo(
diff --git 
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java
 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java
index 354b3c52d0..ede4f9dca4 100644
--- 
a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java
+++ 
b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/source/MaxcomputeSourceTest.java
@@ -22,16 +22,21 @@ import 
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 public class MaxcomputeSourceTest {
 
     @Test
-    public void prepare() {
+    public void testParseSchema() {
         Config fields =
                 ConfigFactory.empty()
                         .withValue("id", ConfigValueFactory.fromAnyRef("int"))
@@ -40,10 +45,40 @@ public class MaxcomputeSourceTest {
 
         Config schema = fields.atKey("fields").atKey("schema");
 
-        MaxcomputeSource maxcomputeSource = new 
MaxcomputeSource(ReadonlyConfig.fromConfig(schema));
+        Config root =
+                schema.withValue("project", 
ConfigValueFactory.fromAnyRef("project"))
+                        .withValue("table_name", 
ConfigValueFactory.fromAnyRef("test_table"));
+
+        MaxcomputeSource maxcomputeSource = new 
MaxcomputeSource(ReadonlyConfig.fromConfig(root));
 
-        SeaTunnelRowType seaTunnelRowType =
-                
maxcomputeSource.getProducedCatalogTables().get(0).getSeaTunnelRowType();
+        CatalogTable table = 
maxcomputeSource.getProducedCatalogTables().get(0);
+        Assertions.assertEquals("project.test_table", 
table.getTablePath().toString());
+        SeaTunnelRowType seaTunnelRowType = table.getSeaTunnelRowType();
         Assertions.assertEquals(SqlType.INT, 
seaTunnelRowType.getFieldType(0).getSqlType());
+
+        Map<String, Object> tableList = new HashMap<>();
+        Map<String, Object> schemaMap = new HashMap<>();
+        Map<String, Object> fieldsMap = new HashMap<>();
+        fieldsMap.put("id", "int");
+        fieldsMap.put("name", "string");
+        fieldsMap.put("age", "int");
+        schemaMap.put("fields", fieldsMap);
+        tableList.put("schema", schemaMap);
+        tableList.put("table_name", "test_table2");
+
+        root =
+                ConfigFactory.empty()
+                        .withValue("project", 
ConfigValueFactory.fromAnyRef("project"))
+                        .withValue("accessId", 
ConfigValueFactory.fromAnyRef("accessId"))
+                        .withValue("accesskey", 
ConfigValueFactory.fromAnyRef("accessKey"))
+                        .withValue(
+                                "table_list",
+                                ConfigValueFactory.fromIterable(
+                                        Collections.singletonList(tableList)));
+
+        maxcomputeSource = new 
MaxcomputeSource(ReadonlyConfig.fromConfig(root));
+
+        table = maxcomputeSource.getProducedCatalogTables().get(0);
+        Assertions.assertEquals("project.test_table2", 
table.getTablePath().toString());
     }
 }

Reply via email to