This is an automated email from the ASF dual-hosted git repository. zhouyao2023 pushed a commit to branch 2.3.4-release in repository https://gitbox.apache.org/repos/asf/seatunnel.git
commit 60a3cc6268e07ec97da9217d9f24c26f4920a9d1 Author: Eric <[email protected]> AuthorDate: Thu Jan 25 12:06:49 2024 +0800 [Improve][Catalog] Use default tablepath when can not get the tablepath from source config (#6276) * Use default tablepath when can not get the tablepath from source config * revert SqlServerCDCIT run in flink engine --- .../api/table/catalog/CatalogTableUtil.java | 2 +- .../seatunnel/api/table/catalog/TablePath.java | 2 +- .../api/table/catalog/CatalogTableUtilTest.java | 15 ++++++++ .../src/test/resources/conf/default_tablepath.conf | 45 ++++++++++++++++++++++ .../fake/source/FakeDataGeneratorTest.java | 7 ++++ .../connector/cdc/sqlserver/SqlServerCDCIT.java | 2 +- 6 files changed, 70 insertions(+), 3 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java index 6b8d19ea71..6f2b6adeb2 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java @@ -215,7 +215,7 @@ public class CatalogTableUtil implements Serializable { } else { Optional<String> resultTableNameOptional = readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME); - tablePath = resultTableNameOptional.map(TablePath::of).orElse(TablePath.EMPTY); + tablePath = resultTableNameOptional.map(TablePath::of).orElse(TablePath.DEFAULT); } return CatalogTable.of( diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java index 60f4b691bd..1257262187 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java @@ -34,7 +34,7 @@ public final class TablePath implements Serializable { private final String schemaName; private final String tableName; - public static final TablePath EMPTY = TablePath.of(null, null, null); + public static final TablePath DEFAULT = TablePath.of("default", "default", "default"); public static TablePath of(String fullName) { return of(fullName, false); diff --git a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java index 9b579fbd90..fff2b4e5de 100644 --- a/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java +++ b/seatunnel-api/src/test/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtilTest.java @@ -138,6 +138,21 @@ public class CatalogTableUtilTest { Thread.currentThread().getContextClassLoader())); } + @Test + public void testDefaultTablePath() throws FileNotFoundException, URISyntaxException { + String path = getTestConfigFile("/conf/default_tablepath.conf"); + Config config = ConfigFactory.parseFile(new File(path)); + Config source = config.getConfigList("source").get(0); + ReadonlyConfig sourceReadonlyConfig = ReadonlyConfig.fromConfig(source); + CatalogTable catalogTable = CatalogTableUtil.buildWithConfig(sourceReadonlyConfig); + Assertions.assertEquals( + TablePath.DEFAULT.getDatabaseName(), catalogTable.getTablePath().getDatabaseName()); + Assertions.assertEquals( + TablePath.DEFAULT.getSchemaName(), catalogTable.getTablePath().getSchemaName()); + Assertions.assertEquals( + TablePath.DEFAULT.getTableName(), catalogTable.getTablePath().getTableName()); + } + @Test public void testGenericRowSchemaTest() throws FileNotFoundException, URISyntaxException { String path = getTestConfigFile("/conf/generic_row.schema.conf"); diff --git a/seatunnel-api/src/test/resources/conf/default_tablepath.conf b/seatunnel-api/src/test/resources/conf/default_tablepath.conf new file mode 100644 index 0000000000..21cd9d5fd8 --- /dev/null +++ b/seatunnel-api/src/test/resources/conf/default_tablepath.conf @@ -0,0 +1,45 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + job.mode = "BATCH" +} + +source { + MongoDB-CDC { + hosts = "mongo0:27017" + database = ["inventory"] + collection = ["inventory.products"] + username = superuser + password = superpw + schema = { + fields { + "_id": string, + "name": string, + "description": string, + "weight": string + } + } + } +} + +transform { +} + +sink { + Console{} +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java index bf962187f2..97c8311e6c 100644 --- a/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java +++ b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; +import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -84,15 +85,21 @@ public class FakeDataGeneratorTest { public void testRowDataParse(String conf) throws FileNotFoundException, URISyntaxException { SeaTunnelRow row1 = new SeaTunnelRow(new Object[] {1L, "A", 100}); row1.setRowKind(RowKind.INSERT); + row1.setTableId(TablePath.DEFAULT.getFullName()); SeaTunnelRow row2 = new SeaTunnelRow(new Object[] {2L, "B", 100}); row2.setRowKind(RowKind.INSERT); + row2.setTableId(TablePath.DEFAULT.getFullName()); SeaTunnelRow row3 = new SeaTunnelRow(new Object[] {3L, "C", 100}); row3.setRowKind(RowKind.INSERT); + row3.setTableId(TablePath.DEFAULT.getFullName()); SeaTunnelRow row1UpdateBefore = new SeaTunnelRow(new Object[] {1L, "A", 100}); + row1UpdateBefore.setTableId(TablePath.DEFAULT.getFullName()); row1UpdateBefore.setRowKind(RowKind.UPDATE_BEFORE); SeaTunnelRow row1UpdateAfter = new SeaTunnelRow(new Object[] {1L, "A_1", 100}); + row1UpdateAfter.setTableId(TablePath.DEFAULT.getFullName()); row1UpdateAfter.setRowKind(RowKind.UPDATE_AFTER); SeaTunnelRow row2Delete = new SeaTunnelRow(new Object[] {2L, "B", 100}); + row2Delete.setTableId(TablePath.DEFAULT.getFullName()); row2Delete.setRowKind(RowKind.DELETE); List<SeaTunnelRow> expected = Arrays.asList(row1, row2, row3, row1UpdateBefore, row1UpdateAfter, row2Delete); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java index 42d2340817..1216c69645 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java @@ -65,7 +65,7 @@ import static org.awaitility.Awaitility.await; @Slf4j @DisabledOnContainer( value = {}, - type = {EngineType.SPARK, EngineType.FLINK}, + type = {EngineType.SPARK}, disabledReason = "Currently SPARK do not support cdc") public class SqlServerCDCIT extends TestSuiteBase implements TestResource {
