This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new da40727d2 [FLINK-37371][cdc-connector/postgresql] Postgres CDC 
incremental source fails to handle upper-case table and column names
da40727d2 is described below

commit da40727d29c8e3d3c637a52d102b3ed636ebf13a
Author: Sergei Morozov <moro...@tut.by>
AuthorDate: Thu Mar 27 05:11:46 2025 -0700

    [FLINK-37371][cdc-connector/postgresql] Postgres CDC incremental source 
fails to handle upper-case table and column names
    
    This closes #3929.
---
 .../postgres/source/utils/PostgresQueryUtils.java  |  25 ++-
 .../postgres/source/PostgresDialectTest.java       |   4 +-
 .../postgres/source/PostgresSourceITCase.java      | 210 +++++++++++----------
 .../source/fetch/PostgresScanFetchTaskTest.java    |  75 ++++----
 .../source/reader/PostgresSourceReaderTest.java    |   6 +-
 .../connectors/postgres/testutils/TestTable.java   |  20 +-
 .../connectors/postgres/testutils/TestTableId.java |  41 ++++
 .../src/test/resources/ddl/customer.sql            |  18 +-
 8 files changed, 220 insertions(+), 179 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
index e7408be34..9c0d55847 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/PostgresQueryUtils.java
@@ -69,12 +69,17 @@ public class PostgresQueryUtils {
         // 
https://stackoverflow.com/questions/7943233/fast-way-to-discover-the-row-count-of-a-table-in-postgresql
         // NOTE: it requires ANALYZE or VACUUM to be run first in PostgreSQL.
         final String query =
-                String.format(
-                        "SELECT reltuples::bigint FROM pg_class WHERE oid = 
to_regclass('%s')",
-                        tableId.toString());
-
-        return jdbc.queryAndMap(
+                "SELECT reltuples::bigint"
+                        + " FROM pg_class c"
+                        + " JOIN pg_namespace n ON n.oid = c.relnamespace"
+                        + " WHERE n.nspname = ?"
+                        + " AND c.relname = ?";
+        return jdbc.prepareQueryAndMap(
                 query,
+                ps -> {
+                    ps.setString(1, tableId.schema());
+                    ps.setString(2, tableId.table());
+                },
                 rs -> {
                     if (!rs.next()) {
                         throw new SQLException(
@@ -244,8 +249,8 @@ public class PostgresQueryUtils {
         }
     }
 
-    public static String quote(String dbOrTableName) {
-        return "\"" + dbOrTableName + "\"";
+    public static String quote(String name) {
+        return "\"" + name.replace("\"", "\"\"") + "\"";
     }
 
     private static String quoteForMinMax(Column column) {
@@ -292,7 +297,7 @@ public class PostgresQueryUtils {
                 fieldNamesIt.hasNext(); ) {
             String fieldName = fieldNamesIt.next();
             boolean isUUID = uuidFields.contains(fieldName);
-            sql.append(fieldName).append(predicate).append(castParam(isUUID));
+            
sql.append(quote(fieldName)).append(predicate).append(castParam(isUUID));
             if (fieldNamesIt.hasNext()) {
                 sql.append(" AND ");
             }
@@ -303,7 +308,7 @@ public class PostgresQueryUtils {
         StringBuilder sql = new StringBuilder();
         for (Iterator<String> fieldNamesIt = 
pkRowType.getFieldNames().iterator();
                 fieldNamesIt.hasNext(); ) {
-            sql.append(fieldNamesIt.next());
+            sql.append(quote(fieldNamesIt.next()));
             if (fieldNamesIt.hasNext()) {
                 sql.append(" , ");
             }
@@ -315,7 +320,7 @@ public class PostgresQueryUtils {
         StringBuilder sql = new StringBuilder();
         for (Iterator<String> fieldNamesIt = 
pkRowType.getFieldNames().iterator();
                 fieldNamesIt.hasNext(); ) {
-            sql.append("MAX(" + fieldNamesIt.next() + ")");
+            sql.append("MAX(" + quote(fieldNamesIt.next()) + ")");
             if (fieldNamesIt.hasNext()) {
                 sql.append(" , ");
             }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java
index b92f1c662..898cec1fb 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java
@@ -56,13 +56,13 @@ class PostgresDialectTest extends PostgresTestBase {
         // get table named 'customer.customers' from customDatabase which is 
actual in
         // inventoryDatabase
         PostgresSourceConfigFactory configFactoryOfCustomDatabase =
-                getMockPostgresSourceConfigFactory(customDatabase, "customer", 
"customers", 10);
+                getMockPostgresSourceConfigFactory(customDatabase, "customer", 
"Customers", 10);
         PostgresDialect dialectOfcustomDatabase =
                 new PostgresDialect(configFactoryOfCustomDatabase.create(0));
         List<TableId> tableIdsOfcustomDatabase =
                 dialectOfcustomDatabase.discoverDataCollections(
                         configFactoryOfCustomDatabase.create(0));
-        
Assertions.assertThat(tableIdsOfcustomDatabase.get(0)).hasToString("customer.customers");
+        
Assertions.assertThat(tableIdsOfcustomDatabase.get(0)).hasToString("customer.Customers");
 
         // get table named 'inventory.products' from customDatabase which is 
actual in
         // inventoryDatabase
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
index 708e2ec1e..d2c50d15b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceITCase.java
@@ -27,8 +27,10 @@ import 
org.apache.flink.cdc.connectors.postgres.PostgresTestBase;
 import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
 import org.apache.flink.cdc.connectors.postgres.testutils.PostgresTestUtils;
 import org.apache.flink.cdc.connectors.postgres.testutils.TestTable;
+import org.apache.flink.cdc.connectors.postgres.testutils.TestTableId;
 import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
 import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -56,6 +58,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.io.IOException;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -64,6 +67,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -155,7 +159,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                 1,
                 PostgresTestUtils.FailoverType.NONE,
                 PostgresTestUtils.FailoverPhase.NEVER,
-                new String[] {"customers"},
+                new String[] {"Customers"},
                 scanStartupMode);
     }
 
@@ -166,7 +170,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                 4,
                 PostgresTestUtils.FailoverType.NONE,
                 PostgresTestUtils.FailoverPhase.NEVER,
-                new String[] {"customers"},
+                new String[] {"Customers"},
                 scanStartupMode);
     }
 
@@ -177,7 +181,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                 1,
                 PostgresTestUtils.FailoverType.NONE,
                 PostgresTestUtils.FailoverPhase.NEVER,
-                new String[] {"customers", "customers_1"},
+                new String[] {"Customers", "customers_1"},
                 scanStartupMode);
     }
 
@@ -188,7 +192,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                 4,
                 PostgresTestUtils.FailoverType.NONE,
                 PostgresTestUtils.FailoverPhase.NEVER,
-                new String[] {"customers", "customers_1"},
+                new String[] {"Customers", "customers_1"},
                 scanStartupMode);
     }
 
@@ -199,7 +203,7 @@ class PostgresSourceITCase extends PostgresTestBase {
         testPostgresParallelSource(
                 PostgresTestUtils.FailoverType.TM,
                 PostgresTestUtils.FailoverPhase.SNAPSHOT,
-                new String[] {"customers", "customers_1"},
+                new String[] {"Customers", "customers_1"},
                 scanStartupMode);
     }
 
@@ -209,7 +213,7 @@ class PostgresSourceITCase extends PostgresTestBase {
         testPostgresParallelSource(
                 PostgresTestUtils.FailoverType.TM,
                 PostgresTestUtils.FailoverPhase.STREAM,
-                new String[] {"customers", "customers_1"},
+                new String[] {"Customers", "customers_1"},
                 scanStartupMode);
     }
 
@@ -219,7 +223,7 @@ class PostgresSourceITCase extends PostgresTestBase {
         testPostgresParallelSource(
                 PostgresTestUtils.FailoverType.JM,
                 PostgresTestUtils.FailoverPhase.SNAPSHOT,
-                new String[] {"customers", "customers_1"},
+                new String[] {"Customers", "customers_1"},
                 scanStartupMode);
     }
 
@@ -229,7 +233,7 @@ class PostgresSourceITCase extends PostgresTestBase {
         testPostgresParallelSource(
                 PostgresTestUtils.FailoverType.JM,
                 PostgresTestUtils.FailoverPhase.STREAM,
-                new String[] {"customers", "customers_1"},
+                new String[] {"Customers", "customers_1"},
                 scanStartupMode);
     }
 
@@ -240,7 +244,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                 1,
                 PostgresTestUtils.FailoverType.TM,
                 PostgresTestUtils.FailoverPhase.SNAPSHOT,
-                new String[] {"customers"},
+                new String[] {"Customers"},
                 scanStartupMode);
     }
 
@@ -251,7 +255,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                 1,
                 PostgresTestUtils.FailoverType.JM,
                 PostgresTestUtils.FailoverPhase.SNAPSHOT,
-                new String[] {"customers"},
+                new String[] {"Customers"},
                 scanStartupMode);
     }
 
@@ -296,7 +300,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                 DEFAULT_SCAN_STARTUP_MODE,
                 PostgresTestUtils.FailoverType.TM,
                 PostgresTestUtils.FailoverPhase.SNAPSHOT,
-                new String[] {"customers"},
+                new String[] {"Customers"},
                 RestartStrategies.fixedDelayRestart(1, 0),
                 
Collections.singletonMap("scan.incremental.snapshot.backfill.skip", "true"));
     }
@@ -313,11 +317,11 @@ class PostgresSourceITCase extends PostgresTestBase {
         String sourceDDL =
                 format(
                         "CREATE TABLE customers ("
-                                + " id BIGINT NOT NULL,"
-                                + " name STRING,"
+                                + " Id BIGINT NOT NULL,"
+                                + " Name STRING,"
                                 + " address STRING,"
                                 + " phone_number STRING,"
-                                + " primary key (id) not enforced"
+                                + " primary key (Id) not enforced"
                                 + ") WITH ("
                                 + " 'connector' = 'postgres-cdc',"
                                 + " 'scan.incremental.snapshot.enabled' = 
'true',"
@@ -339,7 +343,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                         customDatabase.getPassword(),
                         customDatabase.getDatabaseName(),
                         SCHEMA_NAME,
-                        "customers",
+                        "Customers",
                         scanStartupMode,
                         slotName);
         tEnv.executeSql(sourceDDL);
@@ -351,7 +355,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                     tableResult,
                     PostgresTestUtils.FailoverType.JM,
                     PostgresTestUtils.FailoverPhase.STREAM,
-                    new String[] {"customers"});
+                    new String[] {"Customers"});
         }
 
         // second step: check the stream data
@@ -359,9 +363,11 @@ class PostgresSourceITCase extends PostgresTestBase {
                 tableResult,
                 PostgresTestUtils.FailoverType.JM,
                 PostgresTestUtils.FailoverPhase.STREAM,
-                new String[] {"customers"});
+                new String[] {"Customers"});
 
-        tableResult.getJobClient().get().cancel().get();
+        Optional<JobClient> optionalJobClient = tableResult.getJobClient();
+        assertThat(optionalJobClient).isPresent();
+        optionalJobClient.get().cancel().get();
     }
 
     @ParameterizedTest
@@ -603,7 +609,7 @@ class PostgresSourceITCase extends PostgresTestBase {
         int parallelism = 1;
         PostgresTestUtils.FailoverType failoverType = 
PostgresTestUtils.FailoverType.JM;
         PostgresTestUtils.FailoverPhase failoverPhase = 
PostgresTestUtils.FailoverPhase.STREAM;
-        String[] captureCustomerTables = new String[] {"customers"};
+        String[] captureCustomerTables = new String[] {"Customers"};
         RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration =
                 RestartStrategies.fixedDelayRestart(1, 0);
         boolean skipSnapshotBackfill = false;
@@ -617,11 +623,11 @@ class PostgresSourceITCase extends PostgresTestBase {
         String sourceDDL =
                 format(
                         "CREATE TABLE customers ("
-                                + " id BIGINT NOT NULL,"
-                                + " name STRING,"
+                                + " Id BIGINT NOT NULL,"
+                                + " Name STRING,"
                                 + " address STRING,"
                                 + " phone_number STRING,"
-                                + " primary key (id) not enforced"
+                                + " primary key (Id) not enforced"
                                 + ") WITH ("
                                 + " 'connector' = 'postgres-cdc-mock',"
                                 + " 'scan.incremental.snapshot.enabled' = 
'true',"
@@ -658,7 +664,9 @@ class PostgresSourceITCase extends PostgresTestBase {
         // second step: check the stream data
         checkStreamDataWithHook(tableResult, failoverType, failoverPhase, 
captureCustomerTables);
 
-        tableResult.getJobClient().get().cancel().get();
+        Optional<JobClient> optionalJobClient = tableResult.getJobClient();
+        assertThat(optionalJobClient).isPresent();
+        optionalJobClient.get().cancel().get();
 
         // sleep 1000ms to wait until connections are closed.
         Thread.sleep(1000L);
@@ -675,7 +683,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                     scanStartupMode,
                     PostgresTestUtils.FailoverType.NONE,
                     PostgresTestUtils.FailoverPhase.NEVER,
-                    new String[] {"customers"},
+                    new String[] {"Customers"},
                     RestartStrategies.noRestart(),
                     Collections.singletonMap(
                             "scan.incremental.snapshot.chunk.key-column", 
chunkColumn));
@@ -684,7 +692,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                     .hasStackTraceContaining(
                             String.format(
                                     "Chunk key column '%s' doesn't exist in 
the primary key [%s] of the table %s.",
-                                    chunkColumn, "id", "customer.customers"));
+                                    chunkColumn, "Id", "customer.Customers"));
         }
     }
 
@@ -709,7 +717,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                 scanStartupMode,
                 PostgresTestUtils.FailoverType.NONE,
                 PostgresTestUtils.FailoverPhase.NEVER,
-                new String[] {"customers"},
+                new String[] {"Customers"},
                 RestartStrategies.noRestart(),
                 options);
         try (PostgresConnection connection = getConnection()) {
@@ -723,7 +731,7 @@ class PostgresSourceITCase extends PostgresTestBase {
         int parallelism = 1;
         PostgresTestUtils.FailoverType failoverType = 
PostgresTestUtils.FailoverType.TM;
         PostgresTestUtils.FailoverPhase failoverPhase = 
PostgresTestUtils.FailoverPhase.STREAM;
-        String[] captureCustomerTables = new String[] {"customers"};
+        String[] captureCustomerTables = new String[] {"Customers"};
         RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration =
                 RestartStrategies.fixedDelayRestart(1, 0);
         boolean skipSnapshotBackfill = false;
@@ -737,11 +745,11 @@ class PostgresSourceITCase extends PostgresTestBase {
         String sourceDDL =
                 format(
                         "CREATE TABLE customers ("
-                                + " id BIGINT NOT NULL,"
-                                + " name STRING,"
+                                + " Id BIGINT NOT NULL,"
+                                + " Name STRING,"
                                 + " address STRING,"
                                 + " phone_number STRING,"
-                                + " primary key (id) not enforced"
+                                + " primary key (Id) not enforced"
                                 + ") WITH ("
                                 + " 'connector' = 'postgres-cdc',"
                                 + " 'scan.incremental.snapshot.enabled' = 
'true',"
@@ -780,7 +788,9 @@ class PostgresSourceITCase extends PostgresTestBase {
         // second step: check the stream data
         checkStreamDataWithTestLsn(tableResult, failoverType, failoverPhase, 
captureCustomerTables);
 
-        tableResult.getJobClient().get().cancel().get();
+        Optional<JobClient> optionalJobClient = tableResult.getJobClient();
+        assertThat(optionalJobClient).isPresent();
+        optionalJobClient.get().cancel().get();
 
         // sleep 1000ms to wait until connections are closed.
         Thread.sleep(1000L);
@@ -799,17 +809,16 @@ class PostgresSourceITCase extends PostgresTestBase {
         ResolvedSchema customersSchema =
                 new ResolvedSchema(
                         Arrays.asList(
-                                physical("id", BIGINT().notNull()),
-                                physical("name", STRING()),
+                                physical("Id", BIGINT().notNull()),
+                                physical("Name", STRING()),
                                 physical("address", STRING()),
                                 physical("phone_number", STRING())),
                         new ArrayList<>(),
                         UniqueConstraint.primaryKey("pk", 
Collections.singletonList("id")));
-        TestTable customerTable =
-                new TestTable(customDatabase, "customer", "customers", 
customersSchema);
-        String tableId = customerTable.getTableId();
+        TestTableId tableId = new TestTableId("customer", "Customers");
+        TestTable table = new TestTable(customersSchema);
 
-        PostgresSourceBuilder.PostgresIncrementalSource source =
+        PostgresSourceBuilder.PostgresIncrementalSource<RowData> source =
                 
PostgresSourceBuilder.PostgresIncrementalSource.<RowData>builder()
                         .hostname(customDatabase.getHost())
                         .port(customDatabase.getDatabasePort())
@@ -817,11 +826,11 @@ class PostgresSourceITCase extends PostgresTestBase {
                         .password(customDatabase.getPassword())
                         .database(customDatabase.getDatabaseName())
                         .slotName(slotName)
-                        .tableList(tableId)
+                        .tableList(tableId.toString())
                         .startupOptions(startupOptions)
                         .skipSnapshotBackfill(skipSnapshotBackfill)
                         .lsnCommitCheckpointsDelay(1)
-                        .deserializer(customerTable.getDeserializer())
+                        .deserializer(table.getDeserializer())
                         .build();
 
         // Do some database operations during hook in snapshot period.
@@ -830,17 +839,21 @@ class PostgresSourceITCase extends PostgresTestBase {
                 new String[] {
                     String.format(
                             "INSERT INTO %s VALUES (15213, 'user_15213', 
'Shanghai', '123567891234')",
-                            tableId),
-                    String.format("UPDATE %s SET address='Pittsburgh' WHERE 
id=2000", tableId),
-                    String.format("DELETE FROM %s WHERE id=1019", tableId)
+                            tableId.toSql()),
+                    String.format(
+                            "UPDATE %s SET address = 'Pittsburgh' WHERE \"Id\" 
= 2000",
+                            tableId.toSql()),
+                    String.format("DELETE FROM %s WHERE \"Id\" = 1019", 
tableId.toSql())
                 };
         SnapshotPhaseHook snapshotPhaseHook =
                 (sourceConfig, split) -> {
-                    PostgresDialect dialect =
-                            new PostgresDialect((PostgresSourceConfig) 
sourceConfig);
-                    try (PostgresConnection postgresConnection = 
dialect.openJdbcConnection()) {
+                    try (PostgresDialect dialect =
+                                    new PostgresDialect((PostgresSourceConfig) 
sourceConfig);
+                            PostgresConnection postgresConnection = 
dialect.openJdbcConnection()) {
                         postgresConnection.execute(statements);
                         postgresConnection.commit();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
                     }
                 };
 
@@ -861,7 +874,7 @@ class PostgresSourceITCase extends PostgresTestBase {
         try (CloseableIterator<RowData> iterator =
                 env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"Backfill Skipped Source")
                         .executeAndCollect()) {
-            records = fetchRowData(iterator, fetchSize, 
customerTable::stringify);
+            records = fetchRowData(iterator, fetchSize, table::stringify);
             env.close();
         }
         return records;
@@ -934,11 +947,11 @@ class PostgresSourceITCase extends PostgresTestBase {
         String sourceDDL =
                 format(
                         "CREATE TABLE customers ("
-                                + " id BIGINT NOT NULL,"
-                                + " name STRING,"
+                                + " Id BIGINT NOT NULL,"
+                                + " Name STRING,"
                                 + " address STRING,"
                                 + " phone_number STRING,"
-                                + " primary key (id) not enforced"
+                                + " primary key (Id) not enforced"
                                 + ") WITH ("
                                 + " 'connector' = 'postgres-cdc',"
                                 + " 'scan.incremental.snapshot.enabled' = 
'true',"
@@ -953,7 +966,7 @@ class PostgresSourceITCase extends PostgresTestBase {
                                 + " 'scan.incremental.snapshot.chunk.size' = 
'100',"
                                 + " 'slot.name' = '%s',"
                                 + " 'scan.lsn-commit.checkpoints-num-delay' = 
'1'"
-                                + "%s"
+                                + " %s"
                                 + ")",
                         customDatabase.getHost(),
                         customDatabase.getDatabasePort(),
@@ -985,7 +998,9 @@ class PostgresSourceITCase extends PostgresTestBase {
         // second step: check the stream data
         checkStreamData(tableResult, failoverType, failoverPhase, 
captureCustomerTables);
 
-        tableResult.getJobClient().get().cancel().get();
+        Optional<JobClient> optionalJobClient = tableResult.getJobClient();
+        assertThat(optionalJobClient).isPresent();
+        optionalJobClient.get().cancel().get();
 
         // sleep 1000ms to wait until connections are closed.
         Thread.sleep(1000L);
@@ -1028,7 +1043,9 @@ class PostgresSourceITCase extends PostgresTestBase {
         }
 
         CloseableIterator<Row> iterator = tableResult.collect();
-        JobID jobId = tableResult.getJobClient().get().getJobID();
+        Optional<JobClient> optionalJobClient = tableResult.getJobClient();
+        assertThat(optionalJobClient).isPresent();
+        JobID jobId = optionalJobClient.get().getJobID();
 
         // trigger failover after some snapshot splits read finished
         if (failoverPhase == PostgresTestUtils.FailoverPhase.SNAPSHOT && 
iterator.hasNext()) {
@@ -1051,12 +1068,12 @@ class PostgresSourceITCase extends PostgresTestBase {
             throws Exception {
         waitUntilJobRunning(tableResult);
         CloseableIterator<Row> iterator = tableResult.collect();
-        JobID jobId = tableResult.getJobClient().get().getJobID();
+        Optional<JobClient> optionalJobClient = tableResult.getJobClient();
+        assertThat(optionalJobClient).isPresent();
+        JobID jobId = optionalJobClient.get().getJobID();
 
-        for (String tableId : captureCustomerTables) {
-            makeFirstPartStreamEvents(
-                    getConnection(),
-                    customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' 
+ tableId);
+        for (String tableName : captureCustomerTables) {
+            makeFirstPartStreamEvents(getConnection(), new 
TestTableId(SCHEMA_NAME, tableName));
         }
 
         // wait for the stream reading
@@ -1070,10 +1087,8 @@ class PostgresSourceITCase extends PostgresTestBase {
                     () -> sleepMs(200));
             waitUntilJobRunning(tableResult);
         }
-        for (String tableId : captureCustomerTables) {
-            makeSecondPartStreamEvents(
-                    getConnection(),
-                    customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' 
+ tableId);
+        for (String tableName : captureCustomerTables) {
+            makeSecondPartStreamEvents(getConnection(), new 
TestTableId(SCHEMA_NAME, tableName));
         }
 
         List<String> expectedStreamData = new ArrayList<>();
@@ -1096,7 +1111,9 @@ class PostgresSourceITCase extends PostgresTestBase {
             throws Exception {
         waitUntilJobRunning(tableResult);
         CloseableIterator<Row> iterator = tableResult.collect();
-        JobID jobId = tableResult.getJobClient().get().getJobID();
+        Optional<JobClient> optionalJobClient = tableResult.getJobClient();
+        assertThat(optionalJobClient).isPresent();
+        JobID jobId = optionalJobClient.get().getJobID();
 
         final AtomicLong savedCheckpointId = new AtomicLong(0);
         final CountDownLatch countDownLatch = new CountDownLatch(1);
@@ -1107,14 +1124,9 @@ class PostgresSourceITCase extends PostgresTestBase {
                         if (savedCheckpointId.get() == 0) {
                             savedCheckpointId.set(checkpointId);
 
-                            for (String tableId : captureCustomerTables) {
+                            for (String tableName : captureCustomerTables) {
                                 makeFirstPartStreamEvents(
-                                        getConnection(),
-                                        customDatabase.getDatabaseName()
-                                                + '.'
-                                                + SCHEMA_NAME
-                                                + '.'
-                                                + tableId);
+                                        getConnection(), new 
TestTableId(SCHEMA_NAME, tableName));
                             }
                             // wait for the stream reading
                             Thread.sleep(2000L);
@@ -1142,10 +1154,8 @@ class PostgresSourceITCase extends PostgresTestBase {
                     () -> sleepMs(200));
             waitUntilJobRunning(tableResult);
         }
-        for (String tableId : captureCustomerTables) {
-            makeSecondPartStreamEvents(
-                    getConnection(),
-                    customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' 
+ tableId);
+        for (String tableName : captureCustomerTables) {
+            makeSecondPartStreamEvents(getConnection(), new 
TestTableId(SCHEMA_NAME, tableName));
         }
 
         List<String> expectedStreamData = new ArrayList<>();
@@ -1168,12 +1178,12 @@ class PostgresSourceITCase extends PostgresTestBase {
             throws Exception {
         waitUntilJobRunning(tableResult);
         CloseableIterator<Row> iterator = tableResult.collect();
-        JobID jobId = tableResult.getJobClient().get().getJobID();
+        Optional<JobClient> optionalJobClient = tableResult.getJobClient();
+        assertThat(optionalJobClient).isPresent();
+        JobID jobId = optionalJobClient.get().getJobID();
 
-        for (String tableId : captureCustomerTables) {
-            makeFirstPartStreamEvents(
-                    getConnection(),
-                    customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' 
+ tableId);
+        for (String tableName : captureCustomerTables) {
+            makeFirstPartStreamEvents(getConnection(), new 
TestTableId(SCHEMA_NAME, tableName));
         }
 
         // wait for the stream reading
@@ -1186,15 +1196,10 @@ class PostgresSourceITCase extends PostgresTestBase {
                     jobId,
                     miniClusterResource.get().getMiniCluster(),
                     () -> {
-                        for (String tableId : captureCustomerTables) {
+                        for (String tableName : captureCustomerTables) {
                             try {
                                 makeSecondPartStreamEvents(
-                                        getConnection(),
-                                        customDatabase.getDatabaseName()
-                                                + '.'
-                                                + SCHEMA_NAME
-                                                + '.'
-                                                + tableId);
+                                        getConnection(), new 
TestTableId(SCHEMA_NAME, tableName));
                             } catch (SQLException e) {
                                 throw new RuntimeException(e);
                             }
@@ -1223,12 +1228,12 @@ class PostgresSourceITCase extends PostgresTestBase {
             String[] captureCustomerTables)
             throws Exception {
         waitUntilJobRunning(tableResult);
-        JobID jobId = tableResult.getJobClient().get().getJobID();
+        Optional<JobClient> optionalJobClient = tableResult.getJobClient();
+        assertThat(optionalJobClient).isPresent();
+        JobID jobId = optionalJobClient.get().getJobID();
 
-        for (String tableId : captureCustomerTables) {
-            makeFirstPartStreamEvents(
-                    getConnection(),
-                    customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' 
+ tableId);
+        for (String tableName : captureCustomerTables) {
+            makeFirstPartStreamEvents(getConnection(), new 
TestTableId(SCHEMA_NAME, tableName));
         }
 
         // wait for the stream reading and isCommitOffset is true
@@ -1248,10 +1253,8 @@ class PostgresSourceITCase extends PostgresTestBase {
         }
         // wait for the stream reading and isCommitOffset is true
         Thread.sleep(30000L);
-        for (String tableId : captureCustomerTables) {
-            makeSecondPartStreamEvents(
-                    getConnection(),
-                    customDatabase.getDatabaseName() + '.' + SCHEMA_NAME + '.' 
+ tableId);
+        for (String tableName : captureCustomerTables) {
+            makeSecondPartStreamEvents(getConnection(), new 
TestTableId(SCHEMA_NAME, tableName));
         }
         Thread.sleep(5000L);
         try (PostgresConnection connection = getConnection()) {
@@ -1301,17 +1304,19 @@ class PostgresSourceITCase extends PostgresTestBase {
      * Make some changes on the specified customer table. Changelog in string 
could be accessed by
      * {@link #firstPartStreamEvents}.
      */
-    private void makeFirstPartStreamEvents(JdbcConnection connection, String 
tableId)
+    private void makeFirstPartStreamEvents(JdbcConnection connection, 
TestTableId tableId)
             throws SQLException {
         try {
             connection.setAutoCommit(false);
 
             // make stream events for the first split
             connection.execute(
-                    "UPDATE " + tableId + " SET address = 'Hangzhou' where id 
= 103",
-                    "DELETE FROM " + tableId + " where id = 102",
-                    "INSERT INTO " + tableId + " VALUES(102, 
'user_2','Shanghai','123567891234')",
-                    "UPDATE " + tableId + " SET address = 'Shanghai' where id 
= 103");
+                    "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' 
where \"Id\" = 103",
+                    "DELETE FROM " + tableId.toSql() + " where \"Id\" = 102",
+                    "INSERT INTO "
+                            + tableId.toSql()
+                            + " VALUES(102, 'user_2', 'Shanghai', 
'123567891234')",
+                    "UPDATE " + tableId.toSql() + " SET address = 'Shanghai' 
where \"Id\" = 103");
             connection.commit();
         } finally {
             connection.close();
@@ -1322,19 +1327,20 @@ class PostgresSourceITCase extends PostgresTestBase {
      * Make some other changes on the specified customer table. Changelog in 
string could be
      * accessed by {@link #secondPartStreamEvents}.
      */
-    private void makeSecondPartStreamEvents(JdbcConnection connection, String 
tableId)
+    private void makeSecondPartStreamEvents(JdbcConnection connection, 
TestTableId tableId)
             throws SQLException {
         try {
             connection.setAutoCommit(false);
 
             // make stream events for split-1
-            connection.execute("UPDATE " + tableId + " SET address = 
'Hangzhou' where id = 1010");
+            connection.execute(
+                    "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' 
where \"Id\" = 1010");
             connection.commit();
 
             // make stream events for the last split
             connection.execute(
                     "INSERT INTO "
-                            + tableId
+                            + tableId.toSql()
                             + " VALUES(2001, 
'user_22','Shanghai','123567891234'),"
                             + " (2002, 'user_23','Shanghai','123567891234'),"
                             + "(2003, 'user_24','Shanghai','123567891234')");
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
index f31089606..509064c5f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java
@@ -36,13 +36,13 @@ import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConf
 import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfigFactory;
 import 
org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
 import org.apache.flink.cdc.connectors.postgres.testutils.RecordsFormatter;
+import org.apache.flink.cdc.connectors.postgres.testutils.TestTableId;
 import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.DataType;
 
 import io.debezium.connector.postgresql.connection.PostgresConnection;
-import io.debezium.relational.TableId;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -60,7 +60,7 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
     private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
 
     private static final String schemaName = "customer";
-    private static final String tableName = "customers";
+    private static final String tableName = "Customers";
 
     private final UniqueDatabase customDatabase =
             new UniqueDatabase(
@@ -74,15 +74,17 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
     void testChangingDataInSnapshotScan() throws Exception {
         customDatabase.createAndInitialize();
 
-        String tableId = schemaName + "." + tableName;
+        TestTableId tableId = new TestTableId(schemaName, tableName);
         String[] changingDataSql =
                 new String[] {
-                    "UPDATE " + tableId + " SET address = 'Hangzhou' where id 
= 103",
-                    "DELETE FROM " + tableId + " where id = 102",
-                    "INSERT INTO " + tableId + " VALUES(102, 
'user_2','hangzhou','123567891234')",
-                    "UPDATE " + tableId + " SET address = 'Shanghai' where id 
= 103",
-                    "UPDATE " + tableId + " SET address = 'Hangzhou' where id 
= 110",
-                    "UPDATE " + tableId + " SET address = 'Hangzhou' where id 
= 111",
+                    "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' 
where \"Id\" = 103",
+                    "DELETE FROM " + tableId.toSql() + " where \"Id\" = 102",
+                    "INSERT INTO "
+                            + tableId.toSql()
+                            + " VALUES(102, 
'user_2','hangzhou','123567891234')",
+                    "UPDATE " + tableId.toSql() + " SET address = 'Shanghai' 
where \"Id\" = 103",
+                    "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' 
where \"Id\" = 110",
+                    "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' 
where \"Id\" = 111",
                 };
 
         String[] expected =
@@ -106,11 +108,15 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
     @Test
     void testInsertDataInSnapshotScan() throws Exception {
         customDatabase.createAndInitialize();
-        String tableId = schemaName + "." + tableName;
+        TestTableId tableId = new TestTableId(schemaName, tableName);
         String[] insertDataSql =
                 new String[] {
-                    "INSERT INTO " + tableId + " VALUES(112, 
'user_12','Shanghai','123567891234')",
-                    "INSERT INTO " + tableId + " VALUES(113, 
'user_13','Shanghai','123567891234')",
+                    "INSERT INTO "
+                            + tableId.toSql()
+                            + " VALUES(112, 
'user_12','Shanghai','123567891234')",
+                    "INSERT INTO "
+                            + tableId.toSql()
+                            + " VALUES(113, 
'user_13','Shanghai','123567891234')",
                 };
         String[] expected =
                 new String[] {
@@ -135,11 +141,11 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
     @Test
     void testDeleteDataInSnapshotScan() throws Exception {
         customDatabase.createAndInitialize();
-        String tableId = schemaName + "." + tableName;
+        TestTableId tableId = new TestTableId(schemaName, tableName);
         String[] deleteDataSql =
                 new String[] {
-                    "DELETE FROM " + tableId + " where id = 101",
-                    "DELETE FROM " + tableId + " where id = 102",
+                    "DELETE FROM " + tableId.toSql() + " where \"Id\" = 101",
+                    "DELETE FROM " + tableId.toSql() + " where \"Id\" = 102",
                 };
         String[] expected =
                 new String[] {
@@ -161,15 +167,17 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
     void testSnapshotScanSkipBackfillWithPostLowWatermark() throws Exception {
         customDatabase.createAndInitialize();
 
-        String tableId = schemaName + "." + tableName;
+        TestTableId tableId = new TestTableId(schemaName, tableName);
         String[] changingDataSql =
                 new String[] {
-                    "UPDATE " + tableId + " SET address = 'Hangzhou' where id 
= 103",
-                    "DELETE FROM " + tableId + " where id = 102",
-                    "INSERT INTO " + tableId + " VALUES(102, 
'user_2','hangzhou','123567891234')",
-                    "UPDATE " + tableId + " SET address = 'Shanghai' where id 
= 103",
-                    "UPDATE " + tableId + " SET address = 'Hangzhou' where id 
= 110",
-                    "UPDATE " + tableId + " SET address = 'Hangzhou' where id 
= 111",
+                    "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' 
where \"Id\" = 103",
+                    "DELETE FROM " + tableId.toSql() + " where \"Id\" = 102",
+                    "INSERT INTO "
+                            + tableId.toSql()
+                            + " VALUES(102, 
'user_2','hangzhou','123567891234')",
+                    "UPDATE " + tableId.toSql() + " SET address = 'Shanghai' 
where \"Id\" = 103",
+                    "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' 
where \"Id\" = 110",
+                    "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' 
where \"Id\" = 111",
                 };
 
         String[] expected =
@@ -196,15 +204,17 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
     void testSnapshotScanSkipBackfillWithPreHighWatermark() throws Exception {
         customDatabase.createAndInitialize();
 
-        String tableId = schemaName + "." + tableName;
+        TestTableId tableId = new TestTableId(schemaName, tableName);
         String[] changingDataSql =
                 new String[] {
-                    "UPDATE " + tableId + " SET address = 'Hangzhou' where id 
= 103",
-                    "DELETE FROM " + tableId + " where id = 102",
-                    "INSERT INTO " + tableId + " VALUES(102, 
'user_2','hangzhou','123567891234')",
-                    "UPDATE " + tableId + " SET address = 'Shanghai' where id 
= 103",
-                    "UPDATE " + tableId + " SET address = 'Hangzhou' where id 
= 110",
-                    "UPDATE " + tableId + " SET address = 'Hangzhou' where id 
= 111",
+                    "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' 
where \"Id\" = 103",
+                    "DELETE FROM " + tableId.toSql() + " where \"Id\" = 102",
+                    "INSERT INTO "
+                            + tableId.toSql()
+                            + " VALUES(102, 
'user_2','hangzhou','123567891234')",
+                    "UPDATE " + tableId.toSql() + " SET address = 'Shanghai' 
where \"Id\" = 103",
+                    "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' 
where \"Id\" = 110",
+                    "UPDATE " + tableId.toSql() + " SET address = 'Hangzhou' 
where \"Id\" = 111",
                 };
 
         String[] expected =
@@ -261,8 +271,8 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
 
             final DataType dataType =
                     DataTypes.ROW(
-                            DataTypes.FIELD("id", DataTypes.BIGINT()),
-                            DataTypes.FIELD("name", DataTypes.STRING()),
+                            DataTypes.FIELD("Id", DataTypes.BIGINT()),
+                            DataTypes.FIELD("Name", DataTypes.STRING()),
                             DataTypes.FIELD("address", DataTypes.STRING()),
                             DataTypes.FIELD("phone_number", 
DataTypes.STRING()));
 
@@ -318,7 +328,8 @@ class PostgresScanFetchTaskTest extends PostgresTestBase {
     private List<SnapshotSplit> getSnapshotSplits(
             PostgresSourceConfig sourceConfig, JdbcDataSourceDialect 
sourceDialect)
             throws Exception {
-        List<TableId> discoverTables = 
sourceDialect.discoverDataCollections(sourceConfig);
+        List<io.debezium.relational.TableId> discoverTables =
+                sourceDialect.discoverDataCollections(sourceConfig);
         OffsetFactory offsetFactory = new PostgresOffsetFactory();
         final SnapshotSplitAssigner snapshotSplitAssigner =
                 new SnapshotSplitAssigner<JdbcSourceConfig>(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
index 1c855cc3f..5f7ab8dbc 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresSourceReaderTest.java
@@ -79,11 +79,7 @@ class PostgresSourceReaderTest {
         configFactory.password("password");
         configFactory.setLsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay);
         final TestTable customerTable =
-                new TestTable(
-                        "pgdb",
-                        "customer",
-                        "customers",
-                        ResolvedSchema.of(Column.physical("id", BIGINT())));
+                new TestTable(ResolvedSchema.of(Column.physical("id", 
BIGINT())));
         final DebeziumDeserializationSchema<?> deserializer = 
customerTable.getDeserializer();
         MockPostgresDialect dialect = new 
MockPostgresDialect(configFactory.create(0));
         final PostgresSourceBuilder.PostgresIncrementalSource<?> source =
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/TestTable.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/TestTable.java
index d941bb7fa..ddf3c98e5 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/TestTable.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/TestTable.java
@@ -34,11 +34,6 @@ import java.util.List;
  */
 public class TestTable {
 
-    private final String databaseName;
-    private final String tableName;
-
-    private final String schemaName;
-
     private final ResolvedSchema schema;
 
     // Lazily initialized components
@@ -46,23 +41,10 @@ public class TestTable {
     private RowDataDebeziumDeserializeSchema deserializer;
     private RecordsFormatter recordsFormatter;
 
-    public TestTable(
-            UniqueDatabase database, String schemaName, String tableName, 
ResolvedSchema schema) {
-        this(database.getDatabaseName(), schemaName, tableName, schema);
-    }
-
-    public TestTable(
-            String databaseName, String schemaName, String tableName, 
ResolvedSchema schema) {
-        this.databaseName = databaseName;
-        this.schemaName = schemaName;
-        this.tableName = tableName;
+    public TestTable(ResolvedSchema schema) {
         this.schema = schema;
     }
 
-    public String getTableId() {
-        return String.format("%s.%s", schemaName, tableName);
-    }
-
     public RowType getRowType() {
         return (RowType) schema.toPhysicalRowDataType().getLogicalType();
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/TestTableId.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/TestTableId.java
new file mode 100644
index 000000000..14e6b903a
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/testutils/TestTableId.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.postgres.testutils;
+
+import static 
org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils.quote;
+
+/** Represents a qualified table name. */
+public class TestTableId {
+    private final String schemaName;
+    private final String tableName;
+
+    public TestTableId(String schemaName, String tableName) {
+        this.schemaName = schemaName;
+        this.tableName = tableName;
+    }
+
+    /** Returns the qualified name to be used in connector configuration. */
+    public String toString() {
+        return String.format("%s.%s", schemaName, tableName);
+    }
+
+    /** Returns the qualified name to be used in SQL. */
+    public String toSql() {
+        return String.format("%s.%s", quote(schemaName), quote(tableName));
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/customer.sql
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/customer.sql
index 52a1810d5..3626589aa 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/customer.sql
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/customer.sql
@@ -18,15 +18,15 @@ CREATE SCHEMA customer;
 SET search_path TO customer;
 
 -- Create and populate our users using a single insert with many rows
-CREATE TABLE customers (
-  id INTEGER NOT NULL PRIMARY KEY,
-  name VARCHAR(255) NOT NULL DEFAULT 'flink',
+CREATE TABLE "Customers" (
+  "Id" INTEGER NOT NULL PRIMARY KEY,
+  "Name" VARCHAR(255) NOT NULL DEFAULT 'flink',
   address VARCHAR(1024),
   phone_number VARCHAR(512)
 );
-ALTER TABLE customers REPLICA IDENTITY FULL;
+ALTER TABLE "Customers" REPLICA IDENTITY FULL;
 
-INSERT INTO customers
+INSERT INTO "Customers"
 VALUES (101,'user_1','Shanghai','123567891234'),
        (102,'user_2','Shanghai','123567891234'),
        (103,'user_3','Shanghai','123567891234'),
@@ -51,8 +51,8 @@ VALUES (101,'user_1','Shanghai','123567891234'),
 
 -- table has same name prefix with 'customers.*'
 CREATE TABLE customers_1 (
-  id INTEGER NOT NULL PRIMARY KEY,
-  name VARCHAR(255) NOT NULL DEFAULT 'flink',
+  "Id" INTEGER NOT NULL PRIMARY KEY,
+  "Name" VARCHAR(255) NOT NULL DEFAULT 'flink',
   address VARCHAR(1024),
   phone_number VARCHAR(512)
 );
@@ -82,8 +82,8 @@ VALUES (101,'user_1','Shanghai','123567891234'),
        (2000,'user_21','Shanghai','123567891234');
 
 CREATE TABLE customers_no_pk (
-   id INTEGER NOT NULL,
-   name VARCHAR(255) NOT NULL DEFAULT 'flink',
+   "Id" INTEGER NOT NULL,
+   "Name" VARCHAR(255) NOT NULL DEFAULT 'flink',
    address VARCHAR(1024),
    phone_number VARCHAR(512)
 );

Reply via email to