This is an automated email from the ASF dual-hosted git repository.

chl-wxp pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 03346dc4df [Bug] [Postgres-CDC] Fix snapshot table schema lookup 
(#10843)
03346dc4df is described below

commit 03346dc4dfeadf34ed6b1fd394207c5021080fd4
Author: panda <[email protected]>
AuthorDate: Fri Jun 5 18:15:41 2026 +0800

    [Bug] [Postgres-CDC] Fix snapshot table schema lookup (#10843)
---
 .../snapshot/PostgresSnapshotSplitReadTask.java    |  34 ++++++-
 .../PostgresSnapshotSplitReadTaskTest.java         | 104 +++++++++++++++++++++
 2 files changed, 135 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
index 43ba38eb85..4c9790806c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
@@ -166,12 +166,40 @@ public class PostgresSnapshotSplitReadTask
         EventDispatcher.SnapshotReceiver snapshotReceiver =
                 dispatcher.getSnapshotChangeEventReceiver();
         log.debug("Snapshotting table {}", tableId);
-        TableId newTableId = new TableId(null, tableId.schema(), 
tableId.table());
-        createDataEventsForTable(
-                snapshotContext, snapshotReceiver, 
databaseSchema.tableFor(newTableId));
+        createDataEventsForTable(snapshotContext, snapshotReceiver, 
resolveTable(tableId));
         snapshotReceiver.completeSnapshot();
     }
 
+    private Table resolveTable(TableId tableId) {
+        TableId tableIdWithoutCatalog = new TableId(null, tableId.schema(), 
tableId.table());
+        Table table = databaseSchema.tableFor(tableIdWithoutCatalog);
+        if (table != null) {
+            return table;
+        }
+
+        String catalog = tableId.catalog();
+        if (catalog == null || catalog.isEmpty()) {
+            catalog = connectorConfig.databaseName();
+        }
+        if (catalog == null || catalog.isEmpty()) {
+            throw new IllegalStateException(
+                    String.format(
+                            "Cannot find table schema for table %s. Tried 
table id: %s.",
+                            tableId, tableIdWithoutCatalog));
+        }
+
+        TableId tableIdWithCatalog = new TableId(catalog, tableId.schema(), 
tableId.table());
+        table = databaseSchema.tableFor(tableIdWithCatalog);
+        if (table != null) {
+            return table;
+        }
+
+        throw new IllegalStateException(
+                String.format(
+                        "Cannot find table schema for table %s. Tried table 
ids: %s and %s.",
+                        tableId, tableIdWithoutCatalog, tableIdWithCatalog));
+    }
+
     /** Dispatches the data change events for the records of a single table. */
     private void createDataEventsForTable(
             PostgresSnapshotContext snapshotContext,
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTaskTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTaskTest.java
new file mode 100644
index 0000000000..fc27acb233
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTaskTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.pipeline.source.spi.SnapshotProgressListener;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+
+import java.lang.reflect.Method;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PostgresSnapshotSplitReadTaskTest {
+
+    @Test
+    public void testResolveTableFallsBackToCatalogQualifiedTableId() throws 
Exception {
+        TableId tableIdWithoutCatalog = new TableId(null, "public", "users");
+        TableId tableIdWithCatalog = new TableId("traffic", "public", "users");
+        Table table = table(tableIdWithCatalog);
+
+        PostgresSchema databaseSchema = mock(PostgresSchema.class);
+        when(databaseSchema.tableFor(tableIdWithoutCatalog)).thenReturn(null);
+        when(databaseSchema.tableFor(tableIdWithCatalog)).thenReturn(table);
+
+        Table resolvedTable =
+                resolveTable(
+                        newSnapshotSplitReadTask(
+                                mock(PostgresConnectorConfig.class), 
databaseSchema),
+                        tableIdWithCatalog);
+
+        Assertions.assertSame(table, resolvedTable);
+    }
+
+    @Test
+    public void testResolveTableFallsBackToConfiguredDatabaseName() throws 
Exception {
+        TableId tableIdWithoutCatalog = new TableId(null, "public", "users");
+        TableId tableIdWithCatalog = new TableId("traffic", "public", "users");
+        Table table = table(tableIdWithCatalog);
+
+        PostgresConnectorConfig connectorConfig = 
mock(PostgresConnectorConfig.class);
+        when(connectorConfig.databaseName()).thenReturn("traffic");
+
+        PostgresSchema databaseSchema = mock(PostgresSchema.class);
+        when(databaseSchema.tableFor(tableIdWithoutCatalog)).thenReturn(null);
+        when(databaseSchema.tableFor(tableIdWithCatalog)).thenReturn(table);
+
+        Table resolvedTable =
+                resolveTable(
+                        newSnapshotSplitReadTask(connectorConfig, 
databaseSchema),
+                        tableIdWithoutCatalog);
+
+        Assertions.assertSame(table, resolvedTable);
+    }
+
+    private static PostgresSnapshotSplitReadTask newSnapshotSplitReadTask(
+            PostgresConnectorConfig connectorConfig, PostgresSchema 
databaseSchema) {
+        return new PostgresSnapshotSplitReadTask(
+                connectorConfig,
+                null,
+                mock(SnapshotProgressListener.class),
+                databaseSchema,
+                null,
+                null,
+                null);
+    }
+
+    private static Table resolveTable(PostgresSnapshotSplitReadTask task, 
TableId tableId)
+            throws Exception {
+        Method method =
+                PostgresSnapshotSplitReadTask.class.getDeclaredMethod(
+                        "resolveTable", TableId.class);
+        method.setAccessible(true);
+        return (Table) method.invoke(task, tableId);
+    }
+
+    private static Table table(TableId tableId) {
+        return Table.editor()
+                .tableId(tableId)
+                .addColumn(Column.editor().name("id").type("int4").create())
+                .create();
+    }
+}

Reply via email to