[ 
https://issues.apache.org/jira/browse/FLINK-39633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39633:
-----------------------------------
    Labels: pull-request-available  (was: )

> PostgreSQL CDC backfill throws NullPointerException when WAL stream carries 
> records for other captured tables
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39633
>                 URL: https://issues.apache.org/jira/browse/FLINK-39633
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.6.0
>            Reporter: Di Wu
>            Priority: Major
>              Labels: pull-request-available
>
> During the snapshot backfill phase of PostgreSQL CDC (incremental snapshot), 
> the WAL
>   stream from the logical replication slot may carry change records for 
> captured tables
>   other than the one whose chunk is currently being snapshotted.              
>                                                 
>   {\{IncrementalSourceScanFetcher#isChangeRecordInChunkRange}} does not 
> filter records by
>   tableId before delegating to 
> \{{JdbcSourceFetchTaskContext#isRecordBetween}}, which calls                  
>                   
>   {\{getDatabaseSchema().tableFor(record.tableId)}}. If that table's schema 
> is not yet                                         
>   present in \{{RelationAwarePostgresSchema}}'s cache, the lookup returns 
> null and                                             
>   {\{ChunkUtils.getSplitColumn}} invokes \{{primaryKeyColumns()}} on null, 
> throwing NPE and                                     
>   aborting the snapshot split.                                                
>                                                 
>                                                                               
>                                                 
>   h2. Stack trace                                                             
>                                                 
>                   
>   {noformat}
>   java.lang.NullPointerException: Cannot invoke 
> "io.debezium.relational.Table.primaryKeyColumns()" because "table" is null
>       at 
> org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils.getSplitColumn(ChunkUtils.java:45)
>                   
>       at 
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext.getSplitType(PostgresSourceFetch
>   TaskContext.java:291)                                                       
>                                                 
>       at 
> org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext.isRecordBetween(JdbcSourceFetc
>   hTaskContext.java:76)                                                       
>                                                 
>       at 
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.isChangeRecordInChunkRange(I
>   ncrementalSourceScanFetcher.java:265)                                       
>                                                 
>       at 
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollWithBuffer(IncrementalSo
>   urceScanFetcher.java:182)                                                   
>                                                 
>       at 
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(Incremental
>   SourceScanFetcher.java:122)                                                 
>                                                 
>   {noformat}      
>                                                                               
>                                                 
>   h2. Root cause 
>  
>   The PostgreSQL logical replication slot streams WAL changes for *all* 
> tables in the
>   publication, regardless of which snapshot split is currently being 
> processed. During
>   backfill (between low and high watermark), \{{pollWithBuffer}} iterates 
> change events and                                    
>   calls \{{isChangeRecordInChunkRange}} on each. The current implementation in
>   {\{flink-cdc-base}} is:                                                     
>                                                  
>                   
>   {code:java}                                                                 
>                                                 
>   private boolean isChangeRecordInChunkRange(SourceRecord record) {
>       if (taskContext.isDataChangeRecord(record)) {                           
>                                                 
>           return taskContext.isRecordBetween(
>                   record,                                                     
>                                                 
>                   currentSnapshotSplit.getSplitStart(),
>                   currentSnapshotSplit.getSplitEnd());                        
>                                                 
>       }
>       return false;                                                           
>                                                 
>   }               
>   {code}
>  
>   It does not verify that the record's tableId matches                        
>                                                 
>   {\{currentSnapshotSplit.getTableId()}}. When a record for another captured 
> table flows
>   through:                                                                    
>                                                 
>                   
>   * \{{JdbcSourceFetchTaskContext#getSplitKey}} calls                         
>                                                  
>     {\{getDatabaseSchema().tableFor(record.tableId)}};
>   * \{{RelationAwarePostgresSchema}} loads relations lazily from 
> \{{Relation}} messages, so                                     
>     for a table whose schema has not yet been observed the lookup returns 
> null;                                               
>   * \{{ChunkUtils.getSplitColumn(null, ...)}} calls 
> \{{null.primaryKeyColumns()}} -> NPE.                                       
>                                                                               
>                                                 
>   Even when the schema happens to be cached, comparing a record from table B 
> against chunk                                    
>   bounds that were derived from table A's primary key is semantically 
> incorrect and would                                     
>   silently pollute the output buffer.                                         
>                                                 
>                   
>   For contrast, the streaming reader                                          
>                                                 
>   (\{{IncrementalSourceStreamFetcher#shouldEmit}}) already defends against 
> this case via
>   {\{finishedSplitsInfo.containsKey(tableId)}} before invoking 
> \{{isRecordBetween}}. The scan                                   
>   fetcher is missing the symmetric tableId guard.                             
>                                                 
>   
>   h2. Reproduction                                                            
>                                                 
>                   
>   * PostgreSQL source with incremental snapshot enabled (default) and
>     {\{scan.incremental.snapshot.backfill.skip = false}}.
>   * Two or more tables included in the publication, e.g. \{{public.table_a}} 
> and                                               
>     {\{public.table_b}}.
>   * While Flink CDC is snapshotting \{{table_a}}, generate INSERT/UPDATE 
> traffic on                                            
>     {\{table_b}} so its WAL records arrive within \{{table_a}}'s backfill 
> window.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to