This is an automated email from the ASF dual-hosted git repository.
chl-wxp pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 03346dc4df [Bug] [Postgres-CDC] Fix snapshot table schema lookup
(#10843)
03346dc4df is described below
commit 03346dc4dfeadf34ed6b1fd394207c5021080fd4
Author: panda <[email protected]>
AuthorDate: Fri Jun 5 18:15:41 2026 +0800
[Bug] [Postgres-CDC] Fix snapshot table schema lookup (#10843)
---
.../snapshot/PostgresSnapshotSplitReadTask.java | 34 ++++++-
.../PostgresSnapshotSplitReadTaskTest.java | 104 +++++++++++++++++++++
2 files changed, 135 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
index 43ba38eb85..4c9790806c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java
@@ -166,12 +166,40 @@ public class PostgresSnapshotSplitReadTask
EventDispatcher.SnapshotReceiver snapshotReceiver =
dispatcher.getSnapshotChangeEventReceiver();
log.debug("Snapshotting table {}", tableId);
- TableId newTableId = new TableId(null, tableId.schema(),
tableId.table());
- createDataEventsForTable(
- snapshotContext, snapshotReceiver,
databaseSchema.tableFor(newTableId));
+ createDataEventsForTable(snapshotContext, snapshotReceiver,
resolveTable(tableId));
snapshotReceiver.completeSnapshot();
}
+ private Table resolveTable(TableId tableId) {
+ TableId tableIdWithoutCatalog = new TableId(null, tableId.schema(),
tableId.table());
+ Table table = databaseSchema.tableFor(tableIdWithoutCatalog);
+ if (table != null) {
+ return table;
+ }
+
+ String catalog = tableId.catalog();
+ if (catalog == null || catalog.isEmpty()) {
+ catalog = connectorConfig.databaseName();
+ }
+ if (catalog == null || catalog.isEmpty()) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot find table schema for table %s. Tried
table id: %s.",
+ tableId, tableIdWithoutCatalog));
+ }
+
+ TableId tableIdWithCatalog = new TableId(catalog, tableId.schema(),
tableId.table());
+ table = databaseSchema.tableFor(tableIdWithCatalog);
+ if (table != null) {
+ return table;
+ }
+
+ throw new IllegalStateException(
+ String.format(
+ "Cannot find table schema for table %s. Tried table
ids: %s and %s.",
+ tableId, tableIdWithoutCatalog, tableIdWithCatalog));
+ }
+
/** Dispatches the data change events for the records of a single table. */
private void createDataEventsForTable(
PostgresSnapshotContext snapshotContext,
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTaskTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTaskTest.java
new file mode 100644
index 0000000000..fc27acb233
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTaskTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.connector.postgresql.PostgresConnectorConfig;
+import io.debezium.connector.postgresql.PostgresSchema;
+import io.debezium.pipeline.source.spi.SnapshotProgressListener;
+import io.debezium.relational.Column;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+
+import java.lang.reflect.Method;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PostgresSnapshotSplitReadTaskTest {
+
+ @Test
+ public void testResolveTableFallsBackToCatalogQualifiedTableId() throws
Exception {
+ TableId tableIdWithoutCatalog = new TableId(null, "public", "users");
+ TableId tableIdWithCatalog = new TableId("traffic", "public", "users");
+ Table table = table(tableIdWithCatalog);
+
+ PostgresSchema databaseSchema = mock(PostgresSchema.class);
+ when(databaseSchema.tableFor(tableIdWithoutCatalog)).thenReturn(null);
+ when(databaseSchema.tableFor(tableIdWithCatalog)).thenReturn(table);
+
+ Table resolvedTable =
+ resolveTable(
+ newSnapshotSplitReadTask(
+ mock(PostgresConnectorConfig.class),
databaseSchema),
+ tableIdWithCatalog);
+
+ Assertions.assertSame(table, resolvedTable);
+ }
+
+ @Test
+ public void testResolveTableFallsBackToConfiguredDatabaseName() throws
Exception {
+ TableId tableIdWithoutCatalog = new TableId(null, "public", "users");
+ TableId tableIdWithCatalog = new TableId("traffic", "public", "users");
+ Table table = table(tableIdWithCatalog);
+
+ PostgresConnectorConfig connectorConfig =
mock(PostgresConnectorConfig.class);
+ when(connectorConfig.databaseName()).thenReturn("traffic");
+
+ PostgresSchema databaseSchema = mock(PostgresSchema.class);
+ when(databaseSchema.tableFor(tableIdWithoutCatalog)).thenReturn(null);
+ when(databaseSchema.tableFor(tableIdWithCatalog)).thenReturn(table);
+
+ Table resolvedTable =
+ resolveTable(
+ newSnapshotSplitReadTask(connectorConfig,
databaseSchema),
+ tableIdWithoutCatalog);
+
+ Assertions.assertSame(table, resolvedTable);
+ }
+
+ private static PostgresSnapshotSplitReadTask newSnapshotSplitReadTask(
+ PostgresConnectorConfig connectorConfig, PostgresSchema
databaseSchema) {
+ return new PostgresSnapshotSplitReadTask(
+ connectorConfig,
+ null,
+ mock(SnapshotProgressListener.class),
+ databaseSchema,
+ null,
+ null,
+ null);
+ }
+
+ private static Table resolveTable(PostgresSnapshotSplitReadTask task,
TableId tableId)
+ throws Exception {
+ Method method =
+ PostgresSnapshotSplitReadTask.class.getDeclaredMethod(
+ "resolveTable", TableId.class);
+ method.setAccessible(true);
+ return (Table) method.invoke(task, tableId);
+ }
+
+ private static Table table(TableId tableId) {
+ return Table.editor()
+ .tableId(tableId)
+ .addColumn(Column.editor().name("id").type("int4").create())
+ .create();
+ }
+}