This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new da40727d2 [FLINK-37371][cdc-connector/postgresql] Postgres CDC incremental source fails to handle upper-case table and column names da40727d2 is described below commit da40727d29c8e3d3c637a52d102b3ed636ebf13a Author: Sergei Morozov <moro...@tut.by> AuthorDate: Thu Mar 27 05:11:46 2025 -0700 [FLINK-37371][cdc-connector/postgresql] Postgres CDC incremental source fails to handle upper-case table and column names This closes #3929. --- .../postgres/source/utils/PostgresQueryUtils.java | 25 ++- .../postgres/source/PostgresDialectTest.java | 4 +- .../postgres/source/PostgresSourceITCase.java | 210 +++++++++++---------- .../source/fetch/PostgresScanFetchTaskTest.java | 75 ++++---- .../source/reader/PostgresSourceReaderTest.java | 6 +- .../connectors/postgres/testutils/TestTable.java | 20 +- .../connectors/postgres/testutils/TestTableId.java | 41 ++++ .../src/test/resources/ddl/customer.sql | 18 +- 8 files changed, 220 insertions(+), 179 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java index e7408be34..9c0d55847 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java @@ -69,12 +69,17 @@ public class PostgresQueryUtils { // https://stackoverflow.com/questions/7943233/fast-way-to-discover-the-row-count-of-a-table-in-postgresql // NOTE: it requires ANALYZE or VACUUM to be run first in PostgreSQL. final String query = - String.format( - "SELECT reltuples::bigint FROM pg_class WHERE oid = to_regclass('%s')", - tableId.toString()); - - return jdbc.queryAndMap( + "SELECT reltuples::bigint" + + " FROM pg_class c" + + " JOIN pg_namespace n ON n.oid = c.relnamespace" + + " WHERE n.nspname = ?" + + " AND c.relname = ?"; + return jdbc.prepareQueryAndMap( query, + ps -> { + ps.setString(1, tableId.schema()); + ps.setString(2, tableId.table()); + }, rs -> { if (!rs.next()) { throw new SQLException( @@ -244,8 +249,8 @@ public class PostgresQueryUtils { } } - public static String quote(String dbOrTableName) { - return "\"" + dbOrTableName + "\""; + public static String quote(String name) { + return "\"" + name.replace("\"", "\"\"") + "\""; } private static String quoteForMinMax(Column column) { @@ -292,7 +297,7 @@ public class PostgresQueryUtils { fieldNamesIt.hasNext(); ) { String fieldName = fieldNamesIt.next(); boolean isUUID = uuidFields.contains(fieldName); - sql.append(fieldName).append(predicate).append(castParam(isUUID)); + sql.append(quote(fieldName)).append(predicate).append(castParam(isUUID)); if (fieldNamesIt.hasNext()) { sql.append(" AND "); } @@ -303,7 +308,7 @@ public class PostgresQueryUtils { StringBuilder sql = new StringBuilder(); for (Iterator<String> fieldNamesIt = pkRowType.getFieldNames().iterator(); fieldNamesIt.hasNext(); ) { - sql.append(fieldNamesIt.next()); + sql.append(quote(fieldNamesIt.next())); if (fieldNamesIt.hasNext()) { sql.append(" , "); } @@ -315,7 +320,7 @@ public class PostgresQueryUtils { StringBuilder sql = new StringBuilder(); for (Iterator<String> fieldNamesIt = pkRowType.getFieldNames().iterator(); fieldNamesIt.hasNext(); ) { - sql.append("MAX(" + fieldNamesIt.next() + ")"); + sql.append("MAX(" + quote(fieldNamesIt.next()) + ")"); if (fieldNamesIt.hasNext()) { sql.append(" , "); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java index b92f1c662..898cec1fb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java @@ -56,13 +56,13 @@ class PostgresDialectTest extends PostgresTestBase { // get table named 'customer.customers' from customDatabase which is actual in // inventoryDatabase PostgresSourceConfigFactory configFactoryOfCustomDatabase = - getMockPostgresSourceConfigFactory(customDatabase, "customer", "customers", 10); + getMockPostgresSourceConfigFactory(customDatabase, "customer", "Customers", 10); PostgresDialect dialectOfcustomDatabase = new PostgresDialect(configFactoryOfCustomDatabase.create(0)); List<TableId> tableIdsOfcustomDatabase = dialectOfcustomDatabase.discoverDataCollections( configFactoryOfCustomDatabase.create(0)); - Assertions.assertThat(tableIdsOfcustomDatabase.get(0)).hasToString("customer.customers"); + Assertions.assertThat(tableIdsOfcustomDatabase.get(0)).hasToString("customer.Customers"); // get table named 'inventory.products' from customDatabase which is actual in // inventoryDatabase diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java index 708e2ec1e..d2c50d15b 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java @@ -27,8 +27,10 @@ import org.apache.flink.cdc.connectors.postgres.PostgresTestBase; import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; import org.apache.flink.cdc.connectors.postgres.testutils.PostgresTestUtils; import org.apache.flink.cdc.connectors.postgres.testutils.TestTable; +import org.apache.flink.cdc.connectors.postgres.testutils.TestTableId; import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase; import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy; +import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.minicluster.RpcServiceSharing; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -56,6 +58,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.io.IOException; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; @@ -64,6 +67,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -155,7 +159,7 @@ class PostgresSourceITCase extends PostgresTestBase { 1, PostgresTestUtils.FailoverType.NONE, PostgresTestUtils.FailoverPhase.NEVER, - new String[] {"customers"}, + new String[] {"Customers"}, scanStartupMode); } @@ -166,7 +170,7 @@ class PostgresSourceITCase extends PostgresTestBase { 4, PostgresTestUtils.FailoverType.NONE, PostgresTestUtils.FailoverPhase.NEVER, - new String[] {"customers"}, + new String[] {"Customers"}, scanStartupMode); } @@ -177,7 +181,7 @@ class PostgresSourceITCase extends PostgresTestBase { 1, PostgresTestUtils.FailoverType.NONE, PostgresTestUtils.FailoverPhase.NEVER, - new String[] {"customers", "customers_1"}, + new String[] {"Customers", "customers_1"}, scanStartupMode); } @@ -188,7 +192,7 @@ class PostgresSourceITCase extends PostgresTestBase { 4, PostgresTestUtils.FailoverType.NONE, PostgresTestUtils.FailoverPhase.NEVER, - new String[] {"customers", "customers_1"}, + new String[] {"Customers", "customers_1"}, scanStartupMode); } @@ -199,7 +203,7 @@ class PostgresSourceITCase extends PostgresTestBase { testPostgresParallelSource( PostgresTestUtils.FailoverType.TM, PostgresTestUtils.FailoverPhase.SNAPSHOT, - new String[] {"customers", "customers_1"}, + new String[] {"Customers", "customers_1"}, scanStartupMode); } @@ -209,7 +213,7 @@ class PostgresSourceITCase extends PostgresTestBase { testPostgresParallelSource( PostgresTestUtils.FailoverType.TM, PostgresTestUtils.FailoverPhase.STREAM, - new String[] {"customers", "customers_1"}, + new String[] {"Customers", "customers_1"}, scanStartupMode); } @@ -219,7 +223,7 @@ class PostgresSourceITCase extends PostgresTestBase { testPostgresParallelSource( PostgresTestUtils.FailoverType.JM, PostgresTestUtils.FailoverPhase.SNAPSHOT, - new String[] {"customers", "customers_1"}, + new String[] {"Customers", "customers_1"}, scanStartupMode); } @@ -229,7 +233,7 @@ class PostgresSourceITCase extends PostgresTestBase { testPostgresParallelSource( PostgresTestUtils.FailoverType.JM, PostgresTestUtils.FailoverPhase.STREAM, - new String[] {"customers", "customers_1"}, + new String[] {"Customers", "customers_1"}, scanStartupMode); } @@ -240,7 +244,7 @@ class PostgresSourceITCase extends PostgresTestBase { 1, PostgresTestUtils.FailoverType.TM, PostgresTestUtils.FailoverPhase.SNAPSHOT, - new String[] {"customers"}, + new String[] {"Customers"}, scanStartupMode); } @@ -251,7 +255,7 @@ class PostgresSourceITCase extends PostgresTestBase { 1, PostgresTestUtils.FailoverType.JM, PostgresTestUtils.FailoverPhase.SNAPSHOT, - new String[] {"customers"}, + new String[] {"Customers"}, scanStartupMode); } @@ -296,7 +300,7 @@ class PostgresSourceITCase extends PostgresTestBase { DEFAULT_SCAN_STARTUP_MODE, PostgresTestUtils.FailoverType.TM, PostgresTestUtils.FailoverPhase.SNAPSHOT, - new String[] {"customers"}, + new String[] {"Customers"}, RestartStrategies.fixedDelayRestart(1, 0), Collections.singletonMap("scan.incremental.snapshot.backfill.skip", "true")); } @@ -313,11 +317,11 @@ class PostgresSourceITCase extends PostgresTestBase { String sourceDDL = format( "CREATE TABLE customers (" - + " id BIGINT NOT NULL," - + " name STRING," + + " Id BIGINT NOT NULL," + + " Name STRING," + " address STRING," + " phone_number STRING," - + " primary key (id) not enforced" + + " primary key (Id) not enforced" + ") WITH (" + " 'connector' = 'postgres-cdc'," + " 'scan.incremental.snapshot.enabled' = 'true'," @@ -339,7 +343,7 @@ class PostgresSourceITCase extends PostgresTestBase { customDatabase.getPassword(), customDatabase.getDatabaseName(), SCHEMA_NAME, - "customers", + "Customers", scanStartupMode, slotName); tEnv.executeSql(sourceDDL); @@ -351,7 +355,7 @@ class PostgresSourceITCase extends PostgresTestBase { tableResult, PostgresTestUtils.FailoverType.JM, PostgresTestUtils.FailoverPhase.STREAM, - new String[] {"customers"}); + new String[] {"Customers"}); } // second step: check the stream data @@ -359,9 +363,11 @@ class PostgresSourceITCase extends PostgresTestBase { tableResult, PostgresTestUtils.FailoverType.JM, PostgresTestUtils.FailoverPhase.STREAM, - new String[] {"customers"}); + new String[] {"Customers"}); - tableResult.getJobClient().get().cancel().get(); + Optional<JobClient> optionalJobClient = tableResult.getJobClient(); + assertThat(optionalJobClient).isPresent(); + optionalJobClient.get().cancel().get(); } @ParameterizedTest @@ -603,7 +609,7 @@ class PostgresSourceITCase extends PostgresTestBase { int parallelism = 1; PostgresTestUtils.FailoverType failoverType = PostgresTestUtils.FailoverType.JM; PostgresTestUtils.FailoverPhase failoverPhase = PostgresTestUtils.FailoverPhase.STREAM; - String[] captureCustomerTables = new String[] {"customers"}; + String[] captureCustomerTables = new String[] {"Customers"}; RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = RestartStrategies.fixedDelayRestart(1, 0); boolean skipSnapshotBackfill = false; @@ -617,11 +623,11 @@ class PostgresSourceITCase extends PostgresTestBase { String sourceDDL = format( "CREATE TABLE customers (" - + " id BIGINT NOT NULL," - + " name STRING," + + " Id BIGINT NOT NULL," + + " Name STRING," + " address STRING," + " phone_number STRING," - + " primary key (id) not enforced" + + " primary key (Id) not enforced" + ") WITH (" + " 'connector' = 'postgres-cdc-mock'," + " 'scan.incremental.snapshot.enabled' = 'true'," @@ -658,7 +664,9 @@ class PostgresSourceITCase extends PostgresTestBase { // second step: check the stream data checkStreamDataWithHook(tableResult, failoverType, failoverPhase, captureCustomerTables); - tableResult.getJobClient().get().cancel().get(); + Optional<JobClient> optionalJobClient = tableResult.getJobClient(); + assertThat(optionalJobClient).isPresent(); + optionalJobClient.get().cancel().get(); // sleep 1000ms to wait until connections are closed. Thread.sleep(1000L); @@ -675,7 +683,7 @@ class PostgresSourceITCase extends PostgresTestBase { scanStartupMode, PostgresTestUtils.FailoverType.NONE, PostgresTestUtils.FailoverPhase.NEVER, - new String[] {"customers"}, + new String[] {"Customers"}, RestartStrategies.noRestart(), Collections.singletonMap( "scan.incremental.snapshot.chunk.key-column", chunkColumn)); @@ -684,7 +692,7 @@ class PostgresSourceITCase extends PostgresTestBase { .hasStackTraceContaining( String.format( "Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s.", - chunkColumn, "id", "customer.customers")); + chunkColumn, "Id", "customer.Customers")); } } @@ -709,7 +717,7 @@ class PostgresSourceITCase extends PostgresTestBase { scanStartupMode, PostgresTestUtils.FailoverType.NONE, PostgresTestUtils.FailoverPhase.NEVER, - new String[] {"customers"}, + new String[] {"Customers"}, RestartStrategies.noRestart(), options); try (PostgresConnection connection = getConnection()) { @@ -723,7 +731,7 @@ class PostgresSourceITCase extends PostgresTestBase { int parallelism = 1; PostgresTestUtils.FailoverType failoverType = PostgresTestUtils.FailoverType.TM; PostgresTestUtils.FailoverPhase failoverPhase = PostgresTestUtils.FailoverPhase.STREAM; - String[] captureCustomerTables = new String[] {"customers"}; + String[] captureCustomerTables = new String[] {"Customers"}; RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration = RestartStrategies.fixedDelayRestart(1, 0); boolean skipSnapshotBackfill = false; @@ -737,11 +745,11 @@ class PostgresSourceITCase extends PostgresTestBase { String sourceDDL = format( "CREATE TABLE customers (" - + " id BIGINT NOT NULL," - + " name STRING," + + " Id BIGINT NOT NULL," + + " Name STRING," + " address STRING," + " phone_number STRING," - + " primary key (id) not enforced" + + " primary key (Id) not enforced" + ") WITH (" + " 'connector' = 'postgres-cdc'," + " 'scan.incremental.snapshot.enabled' = 'true'," @@ -780,7 +788,9 @@ class PostgresSourceITCase extends PostgresTestBase { // second step: check the stream data checkStreamDataWithTestLsn(tableResult, failoverType, failoverPhase, captureCustomerTables); - tableResult.getJobClient().get().cancel().get(); + Optional<JobClient> optionalJobClient = tableResult.getJobClient(); + assertThat(optionalJobClient).isPresent(); + optionalJobClient.get().cancel().get(); // sleep 1000ms to wait until connections are closed. Thread.sleep(1000L); @@ -799,17 +809,16 @@ class PostgresSourceITCase extends PostgresTestBase { ResolvedSchema customersSchema = new ResolvedSchema( Arrays.asList( - physical("id", BIGINT().notNull()), - physical("name", STRING()), + physical("Id", BIGINT().notNull()), + physical("Name", STRING()), physical("address", STRING()), physical("phone_number", STRING())), new ArrayList<>(), UniqueConstraint.primaryKey("pk", Collections.singletonList("id"))); - TestTable customerTable = - new TestTable(customDatabase, "customer", "customers", customersSchema); - String tableId = customerTable.getTableId(); + TestTableId tableId = new TestTableId("customer", "Customers"); + TestTable table = new TestTable(customersSchema); - PostgresSourceBuilder.PostgresIncrementalSource source = + PostgresSourceBuilder.PostgresIncrementalSource<RowData> source = PostgresSourceBuilder.PostgresIncrementalSource.<RowData>builder() .hostname(customDatabase.getHost()) .port(customDatabase.getDatabasePort()) @@ -817,11 +826,11 @@ class PostgresSourceITCase extends PostgresTestBase { .password(customDatabase.getPassword()) .database(customDatabase.getDatabaseName()) .slotName(slotName) - .tableList(tableId) + .tableList(tableId.toString()) .startupOptions(startupOptions) .skipSnapshotBackfill(skipSnapshotBackfill) .lsnCommitCheckpointsDelay(1) - .deserializer(customerTable.getDeserializer()) + .deserializer(table.getDeserializer()) .build(); // Do some database operations during hook in snapshot period. @@ -830,17 +839,21 @@ class PostgresSourceITCase extends PostgresTestBase { new String[] { String.format( "INSERT INTO %s VALUES (15213, 'user_15213', 'Shanghai', '123567891234')", - tableId), - String.format("UPDATE %s SET address='Pittsburgh' WHERE id=2000", tableId), - String.format("DELETE FROM %s WHERE id=1019", tableId) + tableId.toSql()), + String.format( + "UPDATE %s SET address = 'Pittsburgh' WHERE \"Id\" = 2000", + tableId.toSql()), + String.format("DELETE FROM %s WHERE \"Id\" = 1019", tableId.toSql()) }; SnapshotPhaseHook snapshotPhaseHook = (sourceConfig, split) -> { - PostgresDialect dialect = - new PostgresDialect((PostgresSourceConfig) sourceConfig); - try (PostgresConnection postgresConnection = dialect.openJdbcConnection()) { + try (PostgresDialect dialect = + new PostgresDialect((PostgresSourceConfig) sourceConfig); + PostgresConnection postgresConnection = dialect.openJdbcConnection()) { postgresConnection.execute(statements); postgresConnection.commit(); + } catch (IOException e) { + throw new RuntimeException(e); } }; @@ -861,7 +874,7 @@ class PostgresSourceITCase extends PostgresTestBase { try (CloseableIterator<RowData> iterator = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source") .executeAndCollect()) { - records = fetchRowData(iterator, fetchSize, customerTable::stringify); + records = fetchRowData(iterator, fetchSize, table::stringify); env.close(); } return records; @@ -934,11 +947,11 @@ class PostgresSourceITCase extends PostgresTestBase { String sourceDDL = format( "CREATE TABLE customers (" - + " id BIGINT NOT NULL," - + " name STRING," + + " Id BIGINT NOT NULL," + + " Name STRING," + " address STRING," + " phone_number STRING," - + " primary key (id) not enforced" + + " primary key (Id) not enforced" + ") WITH (" + " 'connector' = 'postgres-cdc'," + " 'scan.incremental.snapshot.enabled' = 'true'," @@ -953,7 +966,7 @@ class PostgresSourceITCase extends PostgresTestBase { + " 'scan.incremental.snapshot.chunk.size' = '100'," + " 'slot.name' = '%s'," + " 'scan.lsn-commit.checkpoints-num-delay' = '1'" - + "%s" + + " %s" + ")", customDatabase.getHost(), customDatabase.getDatabasePort(), @@ -985,7 +998,9 @@ class PostgresSourceITCase extends PostgresTestBase { // second step: check the stream data checkStreamData(tableResult, failoverType, failoverPhase, captureCustomerTables); - tableResult.getJobClient().get().cancel().get(); + Optional<JobClient> optionalJobClient = tableResult.getJobClient(); + assertThat(optionalJobClient).isPresent(); + optionalJobClient.get().cancel().get(); // sleep 1000ms to wait until connections are closed. Thread.sleep(1000L); @@ -1028,7 +1043,9 @@ class PostgresSourceITCase extends PostgresTestBase { } CloseableIterator<Row> iterator = tableResult.collect(); - JobID jobId = tableResult.getJobClient().get().getJobID(); + Optional<JobClient> optionalJobClient = tableResult.getJobClient(); + assertThat(optionalJobClient).isPresent(); + JobID jobId = optionalJobClient.get().getJobID(); // trigger failover after some snapshot splits read finished if (failoverPhase == PostgresTestUtils.FailoverPhase.SNAPSHOT && iterator.hasNext()) { @@ -1051,12 +1068,12 @@ class PostgresSourceITCase extends PostgresTestBase { throws Exception { waitUntilJobRunning(tableResult); CloseableIterator<Row> iterator = tableResult.collect(); - JobID jobId = tableResult.getJobClient().get().getJobID(); + Optional<JobClient> optionalJobClient = tableResult.getJobClient(); + assertThat(optionalJobClient).isPresent(); + JobID jobId = optionalJobClient.get().getJobID(); - for (String tableId : captureCustomerTables) { - makeFirstPartStreamEvents( - getConnection(), - customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + tableId); + for (String tableName : captureCustomerTables) { + makeFirstPartStreamEvents(getConnection(), new TestTableId(SCHEMA_NAME, tableName)); } // wait for the stream reading @@ -1070,10 +1087,8 @@ class PostgresSourceITCase extends PostgresTestBase { () -> sleepMs(200)); waitUntilJobRunning(tableResult); } - for (String tableId : captureCustomerTables) { - makeSecondPartStreamEvents( - getConnection(), - customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + tableId); + for (String tableName : captureCustomerTables) { + makeSecondPartStreamEvents(getConnection(), new TestTableId(SCHEMA_NAME, tableName)); } List<String> expectedStreamData = new ArrayList<>(); @@ -1096,7 +1111,9 @@ class PostgresSourceITCase extends PostgresTestBase { throws Exception { waitUntilJobRunning(tableResult); CloseableIterator<Row> iterator = tableResult.collect(); - JobID jobId = tableResult.getJobClient().get().getJobID(); + Optional<JobClient> optionalJobClient = tableResult.getJobClient(); + assertThat(optionalJobClient).isPresent(); + JobID jobId = optionalJobClient.get().getJobID(); final AtomicLong savedCheckpointId = new AtomicLong(0); final CountDownLatch countDownLatch = new CountDownLatch(1); @@ -1107,14 +1124,9 @@ class PostgresSourceITCase extends PostgresTestBase { if (savedCheckpointId.get() == 0) { savedCheckpointId.set(checkpointId); - for (String tableId : captureCustomerTables) { + for (String tableName : captureCustomerTables) { makeFirstPartStreamEvents( - getConnection(), - customDatabase.getDatabaseName() - + '.' - + SCHEMA_NAME - + '.' - + tableId); + getConnection(), new TestTableId(SCHEMA_NAME, tableName)); } // wait for the stream reading Thread.sleep(2000L); @@ -1142,10 +1154,8 @@ class PostgresSourceITCase extends PostgresTestBase { () -> sleepMs(200)); waitUntilJobRunning(tableResult); } - for (String tableId : captureCustomerTables) { - makeSecondPartStreamEvents( - getConnection(), - customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + tableId); + for (String tableName : captureCustomerTables) { + makeSecondPartStreamEvents(getConnection(), new TestTableId(SCHEMA_NAME, tableName)); } List<String> expectedStreamData = new ArrayList<>(); @@ -1168,12 +1178,12 @@ class PostgresSourceITCase extends PostgresTestBase { throws Exception { waitUntilJobRunning(tableResult); CloseableIterator<Row> iterator = tableResult.collect(); - JobID jobId = tableResult.getJobClient().get().getJobID(); + Optional<JobClient> optionalJobClient = tableResult.getJobClient(); + assertThat(optionalJobClient).isPresent(); + JobID jobId = optionalJobClient.get().getJobID(); - for (String tableId : captureCustomerTables) { - makeFirstPartStreamEvents( - getConnection(), - customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + tableId); + for (String tableName : captureCustomerTables) { + makeFirstPartStreamEvents(getConnection(), new TestTableId(SCHEMA_NAME, tableName)); } // wait for the stream reading @@ -1186,15 +1196,10 @@ class PostgresSourceITCase extends PostgresTestBase { jobId, miniClusterResource.get().getMiniCluster(), () -> { - for (String tableId : captureCustomerTables) { + for (String tableName : captureCustomerTables) { try { makeSecondPartStreamEvents( - getConnection(), - customDatabase.getDatabaseName() - + '.' - + SCHEMA_NAME - + '.' - + tableId); + getConnection(), new TestTableId(SCHEMA_NAME, tableName)); } catch (SQLException e) { throw new RuntimeException(e); } @@ -1223,12 +1228,12 @@ class PostgresSourceITCase extends PostgresTestBase { String[] captureCustomerTables) throws Exception { waitUntilJobRunning(tableResult); - JobID jobId = tableResult.getJobClient().get().getJobID(); + Optional<JobClient> optionalJobClient = tableResult.getJobClient(); + assertThat(optionalJobClient).isPresent(); + JobID jobId = optionalJobClient.get().getJobID(); - for (String tableId : captureCustomerTables) { - makeFirstPartStreamEvents( - getConnection(), - customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + tableId); + for (String tableName : captureCustomerTables) { + makeFirstPartStreamEvents(getConnection(), new TestTableId(SCHEMA_NAME, tableName)); } // wait for the stream reading and isCommitOffset is true @@ -1248,10 +1253,8 @@ class PostgresSourceITCase extends PostgresTestBase { } // wait for the stream reading and isCommitOffset is true Thread.sleep(30000L); - for (String tableId : captureCustomerTables) { - makeSecondPartStreamEvents( - getConnection(), - customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' + tableId); + for (String tableName : captureCustomerTables) { + makeSecondPartStreamEvents(getConnection(), new TestTableId(SCHEMA_NAME, tableName)); } Thread.sleep(5000L); try (PostgresConnection connection = getConnection()) { @@ -1301,17 +1304,19 @@ class PostgresSourceITCase extends PostgresTestBase { * Make some changes on the specified customer table. Changelog in string could be accessed by * {@link #firstPartStreamEvents}. */ - private void makeFirstPartStreamEvents(JdbcConnection connection, String tableId) + private void makeFirstPartStreamEvents(JdbcConnection connection, TestTableId tableId) throws SQLException { try { connection.setAutoCommit(false); // make stream events for the first split connection.execute( - "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", - "DELETE FROM " + tableId + " where id = 102", - "INSERT INTO " + tableId + " VALUES(102, 'user_2','Shanghai','123567891234')", - "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103"); + "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' where \"Id\" = 103", + "DELETE FROM " + tableId.toSql() + " where \"Id\" = 102", + "INSERT INTO " + + tableId.toSql() + + " VALUES(102, 'user_2', 'Shanghai', '123567891234')", + "UPDATE " + tableId.toSql() + " SET address = 'Shanghai' where \"Id\" = 103"); connection.commit(); } finally { connection.close(); @@ -1322,19 +1327,20 @@ class PostgresSourceITCase extends PostgresTestBase { * Make some other changes on the specified customer table. Changelog in string could be * accessed by {@link #secondPartStreamEvents}. */ - private void makeSecondPartStreamEvents(JdbcConnection connection, String tableId) + private void makeSecondPartStreamEvents(JdbcConnection connection, TestTableId tableId) throws SQLException { try { connection.setAutoCommit(false); // make stream events for split-1 - connection.execute("UPDATE " + tableId + " SET address = 'Hangzhou' where id = 1010"); + connection.execute( + "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' where \"Id\" = 1010"); connection.commit(); // make stream events for the last split connection.execute( "INSERT INTO " - + tableId + + tableId.toSql() + " VALUES(2001, 'user_22','Shanghai','123567891234')," + " (2002, 'user_23','Shanghai','123567891234')," + "(2003, 'user_24','Shanghai','123567891234')"); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java index f31089606..509064c5f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java @@ -36,13 +36,13 @@ import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConf import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory; import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory; import org.apache.flink.cdc.connectors.postgres.testutils.RecordsFormatter; +import org.apache.flink.cdc.connectors.postgres.testutils.TestTableId; import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.types.DataType; import io.debezium.connector.postgresql.connection.PostgresConnection; -import io.debezium.relational.TableId; import org.apache.kafka.connect.source.SourceRecord; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; @@ -60,7 +60,7 @@ class PostgresScanFetchTaskTest extends PostgresTestBase { private static final int USE_PRE_HIGHWATERMARK_HOOK = 2; private static final String schemaName = "customer"; - private static final String tableName = "customers"; + private static final String tableName = "Customers"; private final UniqueDatabase customDatabase = new UniqueDatabase( @@ -74,15 +74,17 @@ class PostgresScanFetchTaskTest extends PostgresTestBase { void testChangingDataInSnapshotScan() throws Exception { customDatabase.createAndInitialize(); - String tableId = schemaName + "." + tableName; + TestTableId tableId = new TestTableId(schemaName, tableName); String[] changingDataSql = new String[] { - "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", - "DELETE FROM " + tableId + " where id = 102", - "INSERT INTO " + tableId + " VALUES(102, 'user_2','hangzhou','123567891234')", - "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103", - "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 110", - "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 111", + "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' where \"Id\" = 103", + "DELETE FROM " + tableId.toSql() + " where \"Id\" = 102", + "INSERT INTO " + + tableId.toSql() + + " VALUES(102, 'user_2','hangzhou','123567891234')", + "UPDATE " + tableId.toSql() + " SET address = 'Shanghai' where \"Id\" = 103", + "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' where \"Id\" = 110", + "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' where \"Id\" = 111", }; String[] expected = @@ -106,11 +108,15 @@ class PostgresScanFetchTaskTest extends PostgresTestBase { @Test void testInsertDataInSnapshotScan() throws Exception { customDatabase.createAndInitialize(); - String tableId = schemaName + "." + tableName; + TestTableId tableId = new TestTableId(schemaName, tableName); String[] insertDataSql = new String[] { - "INSERT INTO " + tableId + " VALUES(112, 'user_12','Shanghai','123567891234')", - "INSERT INTO " + tableId + " VALUES(113, 'user_13','Shanghai','123567891234')", + "INSERT INTO " + + tableId.toSql() + + " VALUES(112, 'user_12','Shanghai','123567891234')", + "INSERT INTO " + + tableId.toSql() + + " VALUES(113, 'user_13','Shanghai','123567891234')", }; String[] expected = new String[] { @@ -135,11 +141,11 @@ class PostgresScanFetchTaskTest extends PostgresTestBase { @Test void testDeleteDataInSnapshotScan() throws Exception { customDatabase.createAndInitialize(); - String tableId = schemaName + "." + tableName; + TestTableId tableId = new TestTableId(schemaName, tableName); String[] deleteDataSql = new String[] { - "DELETE FROM " + tableId + " where id = 101", - "DELETE FROM " + tableId + " where id = 102", + "DELETE FROM " + tableId.toSql() + " where \"Id\" = 101", + "DELETE FROM " + tableId.toSql() + " where \"Id\" = 102", }; String[] expected = new String[] { @@ -161,15 +167,17 @@ class PostgresScanFetchTaskTest extends PostgresTestBase { void testSnapshotScanSkipBackfillWithPostLowWatermark() throws Exception { customDatabase.createAndInitialize(); - String tableId = schemaName + "." + tableName; + TestTableId tableId = new TestTableId(schemaName, tableName); String[] changingDataSql = new String[] { - "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", - "DELETE FROM " + tableId + " where id = 102", - "INSERT INTO " + tableId + " VALUES(102, 'user_2','hangzhou','123567891234')", - "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103", - "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 110", - "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 111", + "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' where \"Id\" = 103", + "DELETE FROM " + tableId.toSql() + " where \"Id\" = 102", + "INSERT INTO " + + tableId.toSql() + + " VALUES(102, 'user_2','hangzhou','123567891234')", + "UPDATE " + tableId.toSql() + " SET address = 'Shanghai' where \"Id\" = 103", + "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' where \"Id\" = 110", + "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' where \"Id\" = 111", }; String[] expected = @@ -196,15 +204,17 @@ class PostgresScanFetchTaskTest extends PostgresTestBase { void testSnapshotScanSkipBackfillWithPreHighWatermark() throws Exception { customDatabase.createAndInitialize(); - String tableId = schemaName + "." + tableName; + TestTableId tableId = new TestTableId(schemaName, tableName); String[] changingDataSql = new String[] { - "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 103", - "DELETE FROM " + tableId + " where id = 102", - "INSERT INTO " + tableId + " VALUES(102, 'user_2','hangzhou','123567891234')", - "UPDATE " + tableId + " SET address = 'Shanghai' where id = 103", - "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 110", - "UPDATE " + tableId + " SET address = 'Hangzhou' where id = 111", + "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' where \"Id\" = 103", + "DELETE FROM " + tableId.toSql() + " where \"Id\" = 102", + "INSERT INTO " + + tableId.toSql() + + " VALUES(102, 'user_2','hangzhou','123567891234')", + "UPDATE " + tableId.toSql() + " SET address = 'Shanghai' where \"Id\" = 103", + "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' where \"Id\" = 110", + "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' where \"Id\" = 111", }; String[] expected = @@ -261,8 +271,8 @@ class PostgresScanFetchTaskTest extends PostgresTestBase { final DataType dataType = DataTypes.ROW( - DataTypes.FIELD("id", DataTypes.BIGINT()), - DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("Id", DataTypes.BIGINT()), + DataTypes.FIELD("Name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())); @@ -318,7 +328,8 @@ class PostgresScanFetchTaskTest extends PostgresTestBase { private List<SnapshotSplit> getSnapshotSplits( PostgresSourceConfig sourceConfig, JdbcDataSourceDialect sourceDialect) throws Exception { - List<TableId> discoverTables = sourceDialect.discoverDataCollections(sourceConfig); + List<io.debezium.relational.TableId> discoverTables = + sourceDialect.discoverDataCollections(sourceConfig); OffsetFactory offsetFactory = new PostgresOffsetFactory(); final SnapshotSplitAssigner snapshotSplitAssigner = new SnapshotSplitAssigner<JdbcSourceConfig>( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java index 1c855cc3f..5f7ab8dbc 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java @@ -79,11 +79,7 @@ class PostgresSourceReaderTest { configFactory.password("password"); configFactory.setLsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay); final TestTable customerTable = - new TestTable( - "pgdb", - "customer", - "customers", - ResolvedSchema.of(Column.physical("id", BIGINT()))); + new TestTable(ResolvedSchema.of(Column.physical("id", BIGINT()))); final DebeziumDeserializationSchema<?> deserializer = customerTable.getDeserializer(); MockPostgresDialect dialect = new MockPostgresDialect(configFactory.create(0)); final PostgresSourceBuilder.PostgresIncrementalSource<?> source = diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/TestTable.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/TestTable.java index d941bb7fa..ddf3c98e5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/TestTable.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/TestTable.java @@ -34,11 +34,6 @@ import java.util.List; */ public class TestTable { - private final String databaseName; - private final String tableName; - - private final String schemaName; - private final ResolvedSchema schema; // Lazily initialized components @@ -46,23 +41,10 @@ public class TestTable { private RowDataDebeziumDeserializeSchema deserializer; private RecordsFormatter recordsFormatter; - public TestTable( - UniqueDatabase database, String schemaName, String tableName, ResolvedSchema schema) { - this(database.getDatabaseName(), schemaName, tableName, schema); - } - - public TestTable( - String databaseName, String schemaName, String tableName, ResolvedSchema schema) { - this.databaseName = databaseName; - this.schemaName = schemaName; - this.tableName = tableName; + public TestTable(ResolvedSchema schema) { this.schema = schema; } - public String getTableId() { - return String.format("%s.%s", schemaName, tableName); - } - public RowType getRowType() { return (RowType) schema.toPhysicalRowDataType().getLogicalType(); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/TestTableId.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/TestTableId.java new file mode 100644 index 000000000..14e6b903a --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/TestTableId.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.postgres.testutils; + +import static org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils.quote; + +/** Represents a qualified table name. */ +public class TestTableId { + private final String schemaName; + private final String tableName; + + public TestTableId(String schemaName, String tableName) { + this.schemaName = schemaName; + this.tableName = tableName; + } + + /** Returns the qualified name to be used in connector configuration. */ + public String toString() { + return String.format("%s.%s", schemaName, tableName); + } + + /** Returns the qualified name to be used in SQL. */ + public String toSql() { + return String.format("%s.%s", quote(schemaName), quote(tableName)); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/customer.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/customer.sql index 52a1810d5..3626589aa 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/customer.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/customer.sql @@ -18,15 +18,15 @@ CREATE SCHEMA customer; SET search_path TO customer; -- Create and populate our users using a single insert with many rows -CREATE TABLE customers ( - id INTEGER NOT NULL PRIMARY KEY, - name VARCHAR(255) NOT NULL DEFAULT 'flink', +CREATE TABLE "Customers" ( + "Id" INTEGER NOT NULL PRIMARY KEY, + "Name" VARCHAR(255) NOT NULL DEFAULT 'flink', address VARCHAR(1024), phone_number VARCHAR(512) ); -ALTER TABLE customers REPLICA IDENTITY FULL; +ALTER TABLE "Customers" REPLICA IDENTITY FULL; -INSERT INTO customers +INSERT INTO "Customers" VALUES (101,'user_1','Shanghai','123567891234'), (102,'user_2','Shanghai','123567891234'), (103,'user_3','Shanghai','123567891234'), @@ -51,8 +51,8 @@ VALUES (101,'user_1','Shanghai','123567891234'), -- table has same name prefix with 'customers.*' CREATE TABLE customers_1 ( - id INTEGER NOT NULL PRIMARY KEY, - name VARCHAR(255) NOT NULL DEFAULT 'flink', + "Id" INTEGER NOT NULL PRIMARY KEY, + "Name" VARCHAR(255) NOT NULL DEFAULT 'flink', address VARCHAR(1024), phone_number VARCHAR(512) ); @@ -82,8 +82,8 @@ VALUES (101,'user_1','Shanghai','123567891234'), (2000,'user_21','Shanghai','123567891234'); CREATE TABLE customers_no_pk ( - id INTEGER NOT NULL, - name VARCHAR(255) NOT NULL DEFAULT 'flink', + "Id" INTEGER NOT NULL, + "Name" VARCHAR(255) NOT NULL DEFAULT 'flink', address VARCHAR(1024), phone_number VARCHAR(512) );