This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new df1954d520 [Hotfix][Jdbc] Fix table/query columns order merge for jdbc
catalog (#6771)
df1954d520 is described below
commit df1954d52076c7ba6d3d764c9e4e58725193a102
Author: hailin0 <[email protected]>
AuthorDate: Mon May 6 15:12:41 2024 +0800
[Hotfix][Jdbc] Fix table/query columns order merge for jdbc catalog (#6771)
---
.../seatunnel/jdbc/utils/JdbcCatalogUtils.java | 12 +++++-
.../seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java | 44 ++++++++++++++++------
2 files changed, 44 insertions(+), 12 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index 413305988a..a11ec3d74d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -233,7 +233,17 @@ public class JdbcCatalogUtils {
boolean schemaEquals =
schemaIncludeAllColumns && columnsOfMerge.size() ==
columnsOfPath.size();
if (schemaEquals) {
- return tableOfPath;
+ // Reorder the field list
+ return CatalogTable.of(
+ tableOfPath.getTableId(),
+ TableSchema.builder()
+ .primaryKey(tableSchemaOfPath.getPrimaryKey())
+
.constraintKey(tableSchemaOfPath.getConstraintKeys())
+ .columns(columnsOfMerge)
+ .build(),
+ tableOfPath.getOptions(),
+ tableOfPath.getPartitionKeys(),
+ tableOfPath.getComment());
}
PrimaryKey primaryKeyOfPath = tableSchemaOfPath.getPrimaryKey();
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
index 5fbc658a5e..4162bce30b 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
@@ -30,6 +31,8 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
public class JdbcCatalogUtilsTest {
@@ -103,9 +106,9 @@ public class JdbcCatalogUtilsTest {
TableSchema.builder()
.column(
PhysicalColumn.of(
- "f1",
- BasicType.LONG_TYPE,
- null,
+ "f2",
+ BasicType.STRING_TYPE,
+ 10,
true,
null,
null,
@@ -117,10 +120,10 @@ public class JdbcCatalogUtilsTest {
null))
.column(
PhysicalColumn.of(
- "f2",
+ "f3",
BasicType.STRING_TYPE,
- 10,
- true,
+ 20,
+ false,
null,
null,
null,
@@ -131,10 +134,10 @@ public class JdbcCatalogUtilsTest {
null))
.column(
PhysicalColumn.of(
- "f3",
- BasicType.STRING_TYPE,
- 20,
- false,
+ "f1",
+ BasicType.LONG_TYPE,
+ null,
+ true,
null,
null,
null,
@@ -149,7 +152,26 @@ public class JdbcCatalogUtilsTest {
null);
CatalogTable mergeTable =
JdbcCatalogUtils.mergeCatalogTable(DEFAULT_TABLE, tableOfQuery);
- Assertions.assertEquals(DEFAULT_TABLE, mergeTable);
+ Assertions.assertEquals(DEFAULT_TABLE.getTableId(),
mergeTable.getTableId());
+ Assertions.assertEquals(DEFAULT_TABLE.getOptions(),
mergeTable.getOptions());
+ Assertions.assertEquals(DEFAULT_TABLE.getComment(),
mergeTable.getComment());
+ Assertions.assertEquals(DEFAULT_TABLE.getCatalogName(),
mergeTable.getCatalogName());
+ Assertions.assertNotEquals(DEFAULT_TABLE.getTableSchema(),
mergeTable.getTableSchema());
+ Assertions.assertEquals(
+ DEFAULT_TABLE.getTableSchema().getPrimaryKey(),
+ mergeTable.getTableSchema().getPrimaryKey());
+ Assertions.assertEquals(
+ DEFAULT_TABLE.getTableSchema().getConstraintKeys(),
+ mergeTable.getTableSchema().getConstraintKeys());
+
+ Map<String, Column> columnMap =
+ DEFAULT_TABLE.getTableSchema().getColumns().stream()
+ .collect(Collectors.toMap(e -> e.getName(), e -> e));
+ List<Column> sortByQueryColumns =
+ tableOfQuery.getTableSchema().getColumns().stream()
+ .map(e -> columnMap.get(e.getName()))
+ .collect(Collectors.toList());
+ Assertions.assertEquals(sortByQueryColumns,
mergeTable.getTableSchema().getColumns());
}
@Test