This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new db27ba6ae [hotfix][postgres] Use correct table filter during backfill
db27ba6ae is described below

commit db27ba6ae8ccc0be1e6464835790613d02639ecd
Author: hql0312 <156626757+hql0...@users.noreply.github.com>
AuthorDate: Wed Sep 10 10:50:48 2025 +0800

    [hotfix][postgres] Use correct table filter during backfill
    
    This closes  #3985.
---
 .../fetch/PostgresSourceFetchTaskContext.java      | 27 +++++++++++++++++++---
 1 file changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
index 68c0dec00..e57a73317 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresSourceFetchTaskContext.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.cdc.connectors.base.source.EmbeddedFlinkDatabaseHistory;
 import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
 import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
 import 
org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
 import org.apache.flink.cdc.connectors.postgres.source.PostgresDialect;
 import 
org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceConfig;
@@ -33,6 +34,7 @@ import 
org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils;
 import org.apache.flink.table.types.logical.RowType;
 
 import io.debezium.DebeziumException;
+import io.debezium.config.Configuration;
 import io.debezium.connector.base.ChangeEventQueue;
 import io.debezium.connector.postgresql.PostgresConnectorConfig;
 import io.debezium.connector.postgresql.PostgresErrorHandler;
@@ -143,11 +145,24 @@ public class PostgresSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
                                     .with(Heartbeat.HEARTBEAT_INTERVAL, 0)
                                     .build());
         } else {
+
+            Configuration.Builder builder = dbzConfig.getConfig().edit();
+            if (isBackFillSplit(sourceSplitBase)) {
+                // when backfilled split, only current table schema should be 
scan
+                builder.with(
+                        "table.include.list",
+                        sourceSplitBase
+                                .asStreamSplit()
+                                .getTableSchemas()
+                                .keySet()
+                                .iterator()
+                                .next()
+                                .toString());
+            }
+
             dbzConfig =
                     new PostgresConnectorConfig(
-                            dbzConfig
-                                    .getConfig()
-                                    .edit()
+                            builder
                                     // never drop slot for stream split, which 
is also global split
                                     .with(DROP_SLOT_ON_STOP.name(), false)
                                     .build());
@@ -366,4 +381,10 @@ public class PostgresSourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
                         
sourceConfig.getDbzProperties().getProperty(PLUGIN_NAME.name()))
                 .getPostgresPluginName();
     }
+
+    private boolean isBackFillSplit(SourceSplitBase sourceSplitBase) {
+        return sourceSplitBase.isStreamSplit()
+                && !StreamSplit.STREAM_SPLIT_ID.equalsIgnoreCase(
+                        sourceSplitBase.asStreamSplit().splitId());
+    }
 }

Reply via email to