This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7d8028a60b [Improve][JdbcSource] Optimize catalog-table metadata merge 
logic (#5828)
7d8028a60b is described below

commit 7d8028a60b81beba2265d11351726d90dc69eb59
Author: hailin0 <[email protected]>
AuthorDate: Sat Nov 11 11:23:52 2023 +0800

    [Improve][JdbcSource] Optimize catalog-table metadata merge logic (#5828)
---
 .../seatunnel/jdbc/utils/JdbcCatalogUtils.java     | 113 ++++----
 .../seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java | 295 +++++++++++++++++++++
 2 files changed, 359 insertions(+), 49 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index 306a0552cf..0760846701 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -173,64 +173,58 @@ public class JdbcCatalogUtils {
         return jdbcCatalog.getTable(tableConfig.getQuery());
     }
 
-    private static CatalogTable mergeCatalogTable(
-            CatalogTable tableOfPath, CatalogTable tableOfQuery) {
-        String catalogName =
-                tableOfQuery.getTableId() == null
-                        ? DEFAULT_CATALOG_NAME
-                        : tableOfQuery.getTableId().getCatalogName();
-        TableIdentifier tableIdentifier =
-                TableIdentifier.of(
-                        catalogName,
-                        tableOfPath.getTableId().getDatabaseName(),
-                        tableOfPath.getTableId().getSchemaName(),
-                        tableOfPath.getTableId().getTableName());
-
+    static CatalogTable mergeCatalogTable(CatalogTable tableOfPath, 
CatalogTable tableOfQuery) {
         TableSchema tableSchemaOfPath = tableOfPath.getTableSchema();
         Map<String, Column> columnsOfPath =
                 tableSchemaOfPath.getColumns().stream()
-                        .collect(Collectors.toMap(Column::getName, 
Function.identity()));
-        Set<String> columnKeysOfPath = columnsOfPath.keySet();
+                        .collect(
+                                Collectors.toMap(
+                                        Column::getName,
+                                        Function.identity(),
+                                        (o1, o2) -> o1,
+                                        LinkedHashMap::new));
         TableSchema tableSchemaOfQuery = tableOfQuery.getTableSchema();
         Map<String, Column> columnsOfQuery =
                 tableSchemaOfQuery.getColumns().stream()
-                        .collect(Collectors.toMap(Column::getName, 
Function.identity()));
+                        .collect(
+                                Collectors.toMap(
+                                        Column::getName,
+                                        Function.identity(),
+                                        (o1, o2) -> o1,
+                                        LinkedHashMap::new));
         Set<String> columnKeysOfQuery = columnsOfQuery.keySet();
 
-        if (columnKeysOfPath.equals(columnKeysOfQuery)) {
-            boolean schemaEquals =
-                    columnKeysOfPath.stream()
-                            .allMatch(
-                                    key ->
-                                            columnsOfPath
-                                                    .get(key)
-                                                    .getDataType()
-                                                    
.equals(columnsOfQuery.get(key).getDataType()));
-            if (schemaEquals) {
-                return CatalogTable.of(
-                        tableIdentifier,
-                        TableSchema.builder()
-                                .primaryKey(tableSchemaOfPath.getPrimaryKey())
-                                
.constraintKey(tableSchemaOfPath.getConstraintKeys())
-                                .columns(tableSchemaOfQuery.getColumns())
-                                .build(),
-                        tableOfPath.getOptions(),
-                        tableOfPath.getPartitionKeys(),
-                        tableOfPath.getComment(),
-                        tableIdentifier.getCatalogName());
-            }
+        List<Column> columnsOfMerge =
+                tableSchemaOfQuery.getColumns().stream()
+                        .filter(
+                                column ->
+                                        
columnsOfPath.containsKey(column.getName())
+                                                && columnsOfPath
+                                                        .get(column.getName())
+                                                        .getDataType()
+                                                        .equals(
+                                                                columnsOfQuery
+                                                                        
.get(column.getName())
+                                                                        
.getDataType()))
+                        .map(column -> columnsOfPath.get(column.getName()))
+                        .collect(Collectors.toList());
+        boolean schemaIncludeAllColumns = columnsOfMerge.size() == 
columnKeysOfQuery.size();
+        boolean schemaEquals =
+                schemaIncludeAllColumns && columnsOfMerge.size() == 
columnsOfPath.size();
+        if (schemaEquals) {
+            return tableOfPath;
         }
 
         PrimaryKey primaryKeyOfPath = tableSchemaOfPath.getPrimaryKey();
         List<ConstraintKey> constraintKeysOfPath = 
tableSchemaOfPath.getConstraintKeys();
         List<String> partitionKeysOfPath = tableOfPath.getPartitionKeys();
-        PrimaryKey primaryKeyOfQuery = null;
-        List<ConstraintKey> constraintKeysOfQuery = new ArrayList<>();
-        List<String> partitionKeysOfQuery = new ArrayList<>();
+        PrimaryKey primaryKeyOfMerge = null;
+        List<ConstraintKey> constraintKeysOfMerge = new ArrayList<>();
+        List<String> partitionKeysOfMerge = new ArrayList<>();
 
         if (primaryKeyOfPath != null
                 && 
columnKeysOfQuery.containsAll(primaryKeyOfPath.getColumnNames())) {
-            primaryKeyOfQuery = primaryKeyOfPath;
+            primaryKeyOfMerge = primaryKeyOfPath;
         }
         if (constraintKeysOfPath != null) {
             for (ConstraintKey constraintKey : constraintKeysOfPath) {
@@ -239,26 +233,47 @@ public class JdbcCatalogUtils {
                                 .map(e -> e.getColumnName())
                                 .collect(Collectors.toSet());
                 if (columnKeysOfQuery.containsAll(constraintKeyFields)) {
-                    constraintKeysOfQuery.add(constraintKey);
+                    constraintKeysOfMerge.add(constraintKey);
                 }
             }
         }
         if (partitionKeysOfPath != null && 
columnKeysOfQuery.containsAll(partitionKeysOfPath)) {
-            partitionKeysOfQuery = partitionKeysOfPath;
+            partitionKeysOfMerge = partitionKeysOfPath;
+        }
+        if (schemaIncludeAllColumns) {
+            return CatalogTable.of(
+                    tableOfPath.getTableId(),
+                    TableSchema.builder()
+                            .primaryKey(primaryKeyOfMerge)
+                            .constraintKey(constraintKeysOfMerge)
+                            .columns(columnsOfMerge)
+                            .build(),
+                    tableOfPath.getOptions(),
+                    partitionKeysOfMerge,
+                    tableOfPath.getComment());
         }
 
+        String catalogName =
+                tableOfQuery.getTableId() == null
+                        ? DEFAULT_CATALOG_NAME
+                        : tableOfQuery.getTableId().getCatalogName();
+        TableIdentifier tableIdentifier =
+                TableIdentifier.of(
+                        catalogName,
+                        tableOfPath.getTableId().getDatabaseName(),
+                        tableOfPath.getTableId().getSchemaName(),
+                        tableOfPath.getTableId().getTableName());
         CatalogTable mergedCatalogTable =
                 CatalogTable.of(
                         tableIdentifier,
                         TableSchema.builder()
-                                .primaryKey(primaryKeyOfQuery)
-                                .constraintKey(constraintKeysOfQuery)
+                                .primaryKey(primaryKeyOfMerge)
+                                .constraintKey(constraintKeysOfMerge)
                                 .columns(tableSchemaOfQuery.getColumns())
                                 .build(),
                         tableOfPath.getOptions(),
-                        partitionKeysOfQuery,
-                        tableOfPath.getComment(),
-                        tableIdentifier.getCatalogName());
+                        partitionKeysOfMerge,
+                        tableOfPath.getComment());
 
         log.info("Merged catalog table of path {}", 
tableOfPath.getTableId().toTablePath());
         return mergedCatalogTable;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
new file mode 100644
index 0000000000..5fbc658a5e
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+public class JdbcCatalogUtilsTest {
+    private static final CatalogTable DEFAULT_TABLE =
+            CatalogTable.of(
+                    TableIdentifier.of("mysql-1", "database-x", null, 
"table-x"),
+                    TableSchema.builder()
+                            .column(
+                                    PhysicalColumn.of(
+                                            "f1",
+                                            BasicType.LONG_TYPE,
+                                            null,
+                                            false,
+                                            null,
+                                            null,
+                                            "int unsigned",
+                                            false,
+                                            false,
+                                            null,
+                                            null,
+                                            null))
+                            .column(
+                                    PhysicalColumn.of(
+                                            "f2",
+                                            BasicType.STRING_TYPE,
+                                            10,
+                                            false,
+                                            null,
+                                            null,
+                                            "varchar(10)",
+                                            false,
+                                            false,
+                                            null,
+                                            null,
+                                            null))
+                            .column(
+                                    PhysicalColumn.of(
+                                            "f3",
+                                            BasicType.STRING_TYPE,
+                                            20,
+                                            false,
+                                            null,
+                                            null,
+                                            "varchar(20)",
+                                            false,
+                                            false,
+                                            null,
+                                            null,
+                                            null))
+                            .primaryKey(PrimaryKey.of("pk1", 
Arrays.asList("f1")))
+                            .constraintKey(
+                                    ConstraintKey.of(
+                                            
ConstraintKey.ConstraintType.UNIQUE_KEY,
+                                            "uk1",
+                                            Arrays.asList(
+                                                    
ConstraintKey.ConstraintKeyColumn.of(
+                                                            "f2", 
ConstraintKey.ColumnSortType.ASC),
+                                                    
ConstraintKey.ConstraintKeyColumn.of(
+                                                            "f3",
+                                                            
ConstraintKey.ColumnSortType.ASC))))
+                            .build(),
+                    Collections.emptyMap(),
+                    Collections.singletonList("f2"),
+                    null);
+
+    @Test
+    public void testColumnEqualsMerge() {
+        CatalogTable tableOfQuery =
+                CatalogTable.of(
+                        TableIdentifier.of("default", null, null, "default"),
+                        TableSchema.builder()
+                                .column(
+                                        PhysicalColumn.of(
+                                                "f1",
+                                                BasicType.LONG_TYPE,
+                                                null,
+                                                true,
+                                                null,
+                                                null,
+                                                null,
+                                                false,
+                                                false,
+                                                null,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "f2",
+                                                BasicType.STRING_TYPE,
+                                                10,
+                                                true,
+                                                null,
+                                                null,
+                                                null,
+                                                false,
+                                                false,
+                                                null,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "f3",
+                                                BasicType.STRING_TYPE,
+                                                20,
+                                                false,
+                                                null,
+                                                null,
+                                                null,
+                                                false,
+                                                false,
+                                                null,
+                                                null,
+                                                null))
+                                .build(),
+                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        null);
+
+        CatalogTable mergeTable = 
JdbcCatalogUtils.mergeCatalogTable(DEFAULT_TABLE, tableOfQuery);
+        Assertions.assertEquals(DEFAULT_TABLE, mergeTable);
+    }
+
+    @Test
+    public void testColumnIncludeMerge() {
+        CatalogTable tableOfQuery =
+                CatalogTable.of(
+                        TableIdentifier.of("default", null, null, "default"),
+                        TableSchema.builder()
+                                .column(
+                                        PhysicalColumn.of(
+                                                "f1",
+                                                BasicType.LONG_TYPE,
+                                                null,
+                                                true,
+                                                null,
+                                                null,
+                                                null,
+                                                false,
+                                                false,
+                                                null,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "f3",
+                                                BasicType.STRING_TYPE,
+                                                20,
+                                                false,
+                                                null,
+                                                null,
+                                                null,
+                                                false,
+                                                false,
+                                                null,
+                                                null,
+                                                null))
+                                .build(),
+                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        null);
+
+        CatalogTable mergeTable = 
JdbcCatalogUtils.mergeCatalogTable(DEFAULT_TABLE, tableOfQuery);
+
+        Assertions.assertEquals(DEFAULT_TABLE.getTableId(), 
mergeTable.getTableId());
+        Assertions.assertEquals(
+                DEFAULT_TABLE.getTableSchema().getPrimaryKey(),
+                mergeTable.getTableSchema().getPrimaryKey());
+        Assertions.assertEquals(
+                DEFAULT_TABLE.getTableSchema().getColumns().stream()
+                        .filter(c -> Arrays.asList("f1", 
"f3").contains(c.getName()))
+                        .collect(Collectors.toList()),
+                mergeTable.getTableSchema().getColumns());
+        Assertions.assertTrue(mergeTable.getPartitionKeys().isEmpty());
+        
Assertions.assertTrue(mergeTable.getTableSchema().getConstraintKeys().isEmpty());
+    }
+
+    @Test
+    public void testColumnNotIncludeMerge() {
+        CatalogTable tableOfQuery =
+                CatalogTable.of(
+                        TableIdentifier.of("default", null, null, "default"),
+                        TableSchema.builder()
+                                .column(
+                                        PhysicalColumn.of(
+                                                "f1",
+                                                BasicType.LONG_TYPE,
+                                                null,
+                                                true,
+                                                null,
+                                                null,
+                                                null,
+                                                false,
+                                                false,
+                                                null,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "f2",
+                                                BasicType.STRING_TYPE,
+                                                10,
+                                                true,
+                                                null,
+                                                null,
+                                                null,
+                                                false,
+                                                false,
+                                                null,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "f3",
+                                                BasicType.STRING_TYPE,
+                                                20,
+                                                false,
+                                                null,
+                                                null,
+                                                null,
+                                                false,
+                                                false,
+                                                null,
+                                                null,
+                                                null))
+                                .column(
+                                        PhysicalColumn.of(
+                                                "f4",
+                                                BasicType.STRING_TYPE,
+                                                20,
+                                                false,
+                                                null,
+                                                null,
+                                                null,
+                                                false,
+                                                false,
+                                                null,
+                                                null,
+                                                null))
+                                .build(),
+                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        null);
+
+        CatalogTable mergeTable = 
JdbcCatalogUtils.mergeCatalogTable(DEFAULT_TABLE, tableOfQuery);
+
+        Assertions.assertEquals(
+                DEFAULT_TABLE.getTableId().toTablePath(), 
mergeTable.getTableId().toTablePath());
+        Assertions.assertEquals(DEFAULT_TABLE.getPartitionKeys(), 
mergeTable.getPartitionKeys());
+        Assertions.assertEquals(
+                DEFAULT_TABLE.getTableSchema().getPrimaryKey(),
+                mergeTable.getTableSchema().getPrimaryKey());
+        Assertions.assertEquals(
+                DEFAULT_TABLE.getTableSchema().getConstraintKeys(),
+                mergeTable.getTableSchema().getConstraintKeys());
+
+        Assertions.assertEquals(
+                tableOfQuery.getTableId().getCatalogName(),
+                mergeTable.getTableId().getCatalogName());
+        Assertions.assertEquals(
+                tableOfQuery.getTableSchema().getColumns(),
+                mergeTable.getTableSchema().getColumns());
+    }
+}

Reply via email to