This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7d8028a60b [Improve][JdbcSource] Optimize catalog-table metadata merge
logic (#5828)
7d8028a60b is described below
commit 7d8028a60b81beba2265d11351726d90dc69eb59
Author: hailin0 <[email protected]>
AuthorDate: Sat Nov 11 11:23:52 2023 +0800
[Improve][JdbcSource] Optimize catalog-table metadata merge logic (#5828)
---
.../seatunnel/jdbc/utils/JdbcCatalogUtils.java | 113 ++++----
.../seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java | 295 +++++++++++++++++++++
2 files changed, 359 insertions(+), 49 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index 306a0552cf..0760846701 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -173,64 +173,58 @@ public class JdbcCatalogUtils {
return jdbcCatalog.getTable(tableConfig.getQuery());
}
- private static CatalogTable mergeCatalogTable(
- CatalogTable tableOfPath, CatalogTable tableOfQuery) {
- String catalogName =
- tableOfQuery.getTableId() == null
- ? DEFAULT_CATALOG_NAME
- : tableOfQuery.getTableId().getCatalogName();
- TableIdentifier tableIdentifier =
- TableIdentifier.of(
- catalogName,
- tableOfPath.getTableId().getDatabaseName(),
- tableOfPath.getTableId().getSchemaName(),
- tableOfPath.getTableId().getTableName());
-
+ static CatalogTable mergeCatalogTable(CatalogTable tableOfPath,
CatalogTable tableOfQuery) {
TableSchema tableSchemaOfPath = tableOfPath.getTableSchema();
Map<String, Column> columnsOfPath =
tableSchemaOfPath.getColumns().stream()
- .collect(Collectors.toMap(Column::getName,
Function.identity()));
- Set<String> columnKeysOfPath = columnsOfPath.keySet();
+ .collect(
+ Collectors.toMap(
+ Column::getName,
+ Function.identity(),
+ (o1, o2) -> o1,
+ LinkedHashMap::new));
TableSchema tableSchemaOfQuery = tableOfQuery.getTableSchema();
Map<String, Column> columnsOfQuery =
tableSchemaOfQuery.getColumns().stream()
- .collect(Collectors.toMap(Column::getName,
Function.identity()));
+ .collect(
+ Collectors.toMap(
+ Column::getName,
+ Function.identity(),
+ (o1, o2) -> o1,
+ LinkedHashMap::new));
Set<String> columnKeysOfQuery = columnsOfQuery.keySet();
- if (columnKeysOfPath.equals(columnKeysOfQuery)) {
- boolean schemaEquals =
- columnKeysOfPath.stream()
- .allMatch(
- key ->
- columnsOfPath
- .get(key)
- .getDataType()
-
.equals(columnsOfQuery.get(key).getDataType()));
- if (schemaEquals) {
- return CatalogTable.of(
- tableIdentifier,
- TableSchema.builder()
- .primaryKey(tableSchemaOfPath.getPrimaryKey())
-
.constraintKey(tableSchemaOfPath.getConstraintKeys())
- .columns(tableSchemaOfQuery.getColumns())
- .build(),
- tableOfPath.getOptions(),
- tableOfPath.getPartitionKeys(),
- tableOfPath.getComment(),
- tableIdentifier.getCatalogName());
- }
+ List<Column> columnsOfMerge =
+ tableSchemaOfQuery.getColumns().stream()
+ .filter(
+ column ->
+
columnsOfPath.containsKey(column.getName())
+ && columnsOfPath
+ .get(column.getName())
+ .getDataType()
+ .equals(
+ columnsOfQuery
+
.get(column.getName())
+
.getDataType()))
+ .map(column -> columnsOfPath.get(column.getName()))
+ .collect(Collectors.toList());
+ boolean schemaIncludeAllColumns = columnsOfMerge.size() ==
columnKeysOfQuery.size();
+ boolean schemaEquals =
+ schemaIncludeAllColumns && columnsOfMerge.size() ==
columnsOfPath.size();
+ if (schemaEquals) {
+ return tableOfPath;
}
PrimaryKey primaryKeyOfPath = tableSchemaOfPath.getPrimaryKey();
List<ConstraintKey> constraintKeysOfPath =
tableSchemaOfPath.getConstraintKeys();
List<String> partitionKeysOfPath = tableOfPath.getPartitionKeys();
- PrimaryKey primaryKeyOfQuery = null;
- List<ConstraintKey> constraintKeysOfQuery = new ArrayList<>();
- List<String> partitionKeysOfQuery = new ArrayList<>();
+ PrimaryKey primaryKeyOfMerge = null;
+ List<ConstraintKey> constraintKeysOfMerge = new ArrayList<>();
+ List<String> partitionKeysOfMerge = new ArrayList<>();
if (primaryKeyOfPath != null
&&
columnKeysOfQuery.containsAll(primaryKeyOfPath.getColumnNames())) {
- primaryKeyOfQuery = primaryKeyOfPath;
+ primaryKeyOfMerge = primaryKeyOfPath;
}
if (constraintKeysOfPath != null) {
for (ConstraintKey constraintKey : constraintKeysOfPath) {
@@ -239,26 +233,47 @@ public class JdbcCatalogUtils {
.map(e -> e.getColumnName())
.collect(Collectors.toSet());
if (columnKeysOfQuery.containsAll(constraintKeyFields)) {
- constraintKeysOfQuery.add(constraintKey);
+ constraintKeysOfMerge.add(constraintKey);
}
}
}
if (partitionKeysOfPath != null &&
columnKeysOfQuery.containsAll(partitionKeysOfPath)) {
- partitionKeysOfQuery = partitionKeysOfPath;
+ partitionKeysOfMerge = partitionKeysOfPath;
+ }
+ if (schemaIncludeAllColumns) {
+ return CatalogTable.of(
+ tableOfPath.getTableId(),
+ TableSchema.builder()
+ .primaryKey(primaryKeyOfMerge)
+ .constraintKey(constraintKeysOfMerge)
+ .columns(columnsOfMerge)
+ .build(),
+ tableOfPath.getOptions(),
+ partitionKeysOfMerge,
+ tableOfPath.getComment());
}
+ String catalogName =
+ tableOfQuery.getTableId() == null
+ ? DEFAULT_CATALOG_NAME
+ : tableOfQuery.getTableId().getCatalogName();
+ TableIdentifier tableIdentifier =
+ TableIdentifier.of(
+ catalogName,
+ tableOfPath.getTableId().getDatabaseName(),
+ tableOfPath.getTableId().getSchemaName(),
+ tableOfPath.getTableId().getTableName());
CatalogTable mergedCatalogTable =
CatalogTable.of(
tableIdentifier,
TableSchema.builder()
- .primaryKey(primaryKeyOfQuery)
- .constraintKey(constraintKeysOfQuery)
+ .primaryKey(primaryKeyOfMerge)
+ .constraintKey(constraintKeysOfMerge)
.columns(tableSchemaOfQuery.getColumns())
.build(),
tableOfPath.getOptions(),
- partitionKeysOfQuery,
- tableOfPath.getComment(),
- tableIdentifier.getCatalogName());
+ partitionKeysOfMerge,
+ tableOfPath.getComment());
log.info("Merged catalog table of path {}",
tableOfPath.getTableId().toTablePath());
return mergedCatalogTable;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
new file mode 100644
index 0000000000..5fbc658a5e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.utils;
+
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+public class JdbcCatalogUtilsTest {
+ private static final CatalogTable DEFAULT_TABLE =
+ CatalogTable.of(
+ TableIdentifier.of("mysql-1", "database-x", null,
"table-x"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "f1",
+ BasicType.LONG_TYPE,
+ null,
+ false,
+ null,
+ null,
+ "int unsigned",
+ false,
+ false,
+ null,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "f2",
+ BasicType.STRING_TYPE,
+ 10,
+ false,
+ null,
+ null,
+ "varchar(10)",
+ false,
+ false,
+ null,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "f3",
+ BasicType.STRING_TYPE,
+ 20,
+ false,
+ null,
+ null,
+ "varchar(20)",
+ false,
+ false,
+ null,
+ null,
+ null))
+ .primaryKey(PrimaryKey.of("pk1",
Arrays.asList("f1")))
+ .constraintKey(
+ ConstraintKey.of(
+
ConstraintKey.ConstraintType.UNIQUE_KEY,
+ "uk1",
+ Arrays.asList(
+
ConstraintKey.ConstraintKeyColumn.of(
+ "f2",
ConstraintKey.ColumnSortType.ASC),
+
ConstraintKey.ConstraintKeyColumn.of(
+ "f3",
+
ConstraintKey.ColumnSortType.ASC))))
+ .build(),
+ Collections.emptyMap(),
+ Collections.singletonList("f2"),
+ null);
+
+ @Test
+ public void testColumnEqualsMerge() {
+ CatalogTable tableOfQuery =
+ CatalogTable.of(
+ TableIdentifier.of("default", null, null, "default"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "f1",
+ BasicType.LONG_TYPE,
+ null,
+ true,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "f2",
+ BasicType.STRING_TYPE,
+ 10,
+ true,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "f3",
+ BasicType.STRING_TYPE,
+ 20,
+ false,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ null))
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ null);
+
+ CatalogTable mergeTable =
JdbcCatalogUtils.mergeCatalogTable(DEFAULT_TABLE, tableOfQuery);
+ Assertions.assertEquals(DEFAULT_TABLE, mergeTable);
+ }
+
+ @Test
+ public void testColumnIncludeMerge() {
+ CatalogTable tableOfQuery =
+ CatalogTable.of(
+ TableIdentifier.of("default", null, null, "default"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "f1",
+ BasicType.LONG_TYPE,
+ null,
+ true,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "f3",
+ BasicType.STRING_TYPE,
+ 20,
+ false,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ null))
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ null);
+
+ CatalogTable mergeTable =
JdbcCatalogUtils.mergeCatalogTable(DEFAULT_TABLE, tableOfQuery);
+
+ Assertions.assertEquals(DEFAULT_TABLE.getTableId(),
mergeTable.getTableId());
+ Assertions.assertEquals(
+ DEFAULT_TABLE.getTableSchema().getPrimaryKey(),
+ mergeTable.getTableSchema().getPrimaryKey());
+ Assertions.assertEquals(
+ DEFAULT_TABLE.getTableSchema().getColumns().stream()
+ .filter(c -> Arrays.asList("f1",
"f3").contains(c.getName()))
+ .collect(Collectors.toList()),
+ mergeTable.getTableSchema().getColumns());
+ Assertions.assertTrue(mergeTable.getPartitionKeys().isEmpty());
+
Assertions.assertTrue(mergeTable.getTableSchema().getConstraintKeys().isEmpty());
+ }
+
+ @Test
+ public void testColumnNotIncludeMerge() {
+ CatalogTable tableOfQuery =
+ CatalogTable.of(
+ TableIdentifier.of("default", null, null, "default"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "f1",
+ BasicType.LONG_TYPE,
+ null,
+ true,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "f2",
+ BasicType.STRING_TYPE,
+ 10,
+ true,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "f3",
+ BasicType.STRING_TYPE,
+ 20,
+ false,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "f4",
+ BasicType.STRING_TYPE,
+ 20,
+ false,
+ null,
+ null,
+ null,
+ false,
+ false,
+ null,
+ null,
+ null))
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ null);
+
+ CatalogTable mergeTable =
JdbcCatalogUtils.mergeCatalogTable(DEFAULT_TABLE, tableOfQuery);
+
+ Assertions.assertEquals(
+ DEFAULT_TABLE.getTableId().toTablePath(),
mergeTable.getTableId().toTablePath());
+ Assertions.assertEquals(DEFAULT_TABLE.getPartitionKeys(),
mergeTable.getPartitionKeys());
+ Assertions.assertEquals(
+ DEFAULT_TABLE.getTableSchema().getPrimaryKey(),
+ mergeTable.getTableSchema().getPrimaryKey());
+ Assertions.assertEquals(
+ DEFAULT_TABLE.getTableSchema().getConstraintKeys(),
+ mergeTable.getTableSchema().getConstraintKeys());
+
+ Assertions.assertEquals(
+ tableOfQuery.getTableId().getCatalogName(),
+ mergeTable.getTableId().getCatalogName());
+ Assertions.assertEquals(
+ tableOfQuery.getTableSchema().getColumns(),
+ mergeTable.getTableSchema().getColumns());
+ }
+}