This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new df1954d520 [Hotfix][Jdbc] Fix table/query columns order merge for jdbc 
catalog (#6771)
df1954d520 is described below

commit df1954d52076c7ba6d3d764c9e4e58725193a102
Author: hailin0 <[email protected]>
AuthorDate: Mon May 6 15:12:41 2024 +0800

    [Hotfix][Jdbc] Fix table/query columns order merge for jdbc catalog (#6771)
---
 .../seatunnel/jdbc/utils/JdbcCatalogUtils.java     | 12 +++++-
 .../seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java | 44 ++++++++++++++++------
 2 files changed, 44 insertions(+), 12 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index 413305988a..a11ec3d74d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -233,7 +233,17 @@ public class JdbcCatalogUtils {
         boolean schemaEquals =
                 schemaIncludeAllColumns && columnsOfMerge.size() == 
columnsOfPath.size();
         if (schemaEquals) {
-            return tableOfPath;
+            // Reorder the field list
+            return CatalogTable.of(
+                    tableOfPath.getTableId(),
+                    TableSchema.builder()
+                            .primaryKey(tableSchemaOfPath.getPrimaryKey())
+                            
.constraintKey(tableSchemaOfPath.getConstraintKeys())
+                            .columns(columnsOfMerge)
+                            .build(),
+                    tableOfPath.getOptions(),
+                    tableOfPath.getPartitionKeys(),
+                    tableOfPath.getComment());
         }
 
         PrimaryKey primaryKeyOfPath = tableSchemaOfPath.getPrimaryKey();
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
index 5fbc658a5e..4162bce30b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
 
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
 import org.apache.seatunnel.api.table.catalog.ConstraintKey;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PrimaryKey;
@@ -30,6 +31,8 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 public class JdbcCatalogUtilsTest {
@@ -103,9 +106,9 @@ public class JdbcCatalogUtilsTest {
                         TableSchema.builder()
                                 .column(
                                         PhysicalColumn.of(
-                                                "f1",
-                                                BasicType.LONG_TYPE,
-                                                null,
+                                                "f2",
+                                                BasicType.STRING_TYPE,
+                                                10,
                                                 true,
                                                 null,
                                                 null,
@@ -117,10 +120,10 @@ public class JdbcCatalogUtilsTest {
                                                 null))
                                 .column(
                                         PhysicalColumn.of(
-                                                "f2",
+                                                "f3",
                                                 BasicType.STRING_TYPE,
-                                                10,
-                                                true,
+                                                20,
+                                                false,
                                                 null,
                                                 null,
                                                 null,
@@ -131,10 +134,10 @@ public class JdbcCatalogUtilsTest {
                                                 null))
                                 .column(
                                         PhysicalColumn.of(
-                                                "f3",
-                                                BasicType.STRING_TYPE,
-                                                20,
-                                                false,
+                                                "f1",
+                                                BasicType.LONG_TYPE,
+                                                null,
+                                                true,
                                                 null,
                                                 null,
                                                 null,
@@ -149,7 +152,26 @@ public class JdbcCatalogUtilsTest {
                         null);
 
         CatalogTable mergeTable = 
JdbcCatalogUtils.mergeCatalogTable(DEFAULT_TABLE, tableOfQuery);
-        Assertions.assertEquals(DEFAULT_TABLE, mergeTable);
+        Assertions.assertEquals(DEFAULT_TABLE.getTableId(), 
mergeTable.getTableId());
+        Assertions.assertEquals(DEFAULT_TABLE.getOptions(), 
mergeTable.getOptions());
+        Assertions.assertEquals(DEFAULT_TABLE.getComment(), 
mergeTable.getComment());
+        Assertions.assertEquals(DEFAULT_TABLE.getCatalogName(), 
mergeTable.getCatalogName());
+        Assertions.assertNotEquals(DEFAULT_TABLE.getTableSchema(), 
mergeTable.getTableSchema());
+        Assertions.assertEquals(
+                DEFAULT_TABLE.getTableSchema().getPrimaryKey(),
+                mergeTable.getTableSchema().getPrimaryKey());
+        Assertions.assertEquals(
+                DEFAULT_TABLE.getTableSchema().getConstraintKeys(),
+                mergeTable.getTableSchema().getConstraintKeys());
+
+        Map<String, Column> columnMap =
+                DEFAULT_TABLE.getTableSchema().getColumns().stream()
+                        .collect(Collectors.toMap(e -> e.getName(), e -> e));
+        List<Column> sortByQueryColumns =
+                tableOfQuery.getTableSchema().getColumns().stream()
+                        .map(e -> columnMap.get(e.getName()))
+                        .collect(Collectors.toList());
+        Assertions.assertEquals(sortByQueryColumns, 
mergeTable.getTableSchema().getColumns());
     }
 
     @Test

Reply via email to