This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new db27ba6ae [hotfix][postgres] Use correct table filter during backfill db27ba6ae is described below commit db27ba6ae8ccc0be1e6464835790613d02639ecd Author: hql0312 <156626757+hql0...@users.noreply.github.com> AuthorDate: Wed Sep 10 10:50:48 2025 +0800 [hotfix][postgres] Use correct table filter during backfill This closes #3985. --- .../fetch/PostgresSourceFetchTaskContext.java | 27 +++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java index 68c0dec00..e57a73317 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory; import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset; import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit; import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase; +import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit; import org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext; import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect; import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig; @@ -33,6 +34,7 @@ import org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils; import org.apache.flink.table.types.logical.RowType; import io.debezium.DebeziumException; +import io.debezium.config.Configuration; import io.debezium.connector.base.ChangeEventQueue; import io.debezium.connector.postgresql.PostgresConnectorConfig; import io.debezium.connector.postgresql.PostgresErrorHandler; @@ -143,11 +145,24 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext { .with(Heartbeat.HEARTBEAT_INTERVAL, 0) .build()); } else { + + Configuration.Builder builder = dbzConfig.getConfig().edit(); + if (isBackFillSplit(sourceSplitBase)) { + // when backfilled split, only current table schema should be scan + builder.with( + "table.include.list", + sourceSplitBase + .asStreamSplit() + .getTableSchemas() + .keySet() + .iterator() + .next() + .toString()); + } + dbzConfig = new PostgresConnectorConfig( - dbzConfig - .getConfig() - .edit() + builder // never drop slot for stream split, which is also global split .with(DROP_SLOT_ON_STOP.name(), false) .build()); @@ -366,4 +381,10 @@ public class PostgresSourceFetchTaskContext extends JdbcSourceFetchTaskContext { sourceConfig.getDbzProperties().getProperty(PLUGIN_NAME.name())) .getPostgresPluginName(); } + + private boolean isBackFillSplit(SourceSplitBase sourceSplitBase) { + return sourceSplitBase.isStreamSplit() + && !StreamSplit.STREAM_SPLIT_ID.equalsIgnoreCase( + sourceSplitBase.asStreamSplit().splitId()); + } }