JNSimba opened a new pull request, #4390:
URL: https://github.com/apache/flink-cdc/pull/4390

   This closes https://issues.apache.org/jira/browse/FLINK-39633.
   
   ## Problem
   
   During the snapshot backfill phase of PostgreSQL CDC (incremental snapshot), 
the WAL stream from the logical replication slot may carry change records for 
captured tables other than the one whose chunk is currently being snapshotted. 
`IncrementalSourceScanFetcher#isChangeRecordInChunkRange` does not filter 
records by tableId before delegating to 
`JdbcSourceFetchTaskContext#isRecordBetween`, which calls 
`getDatabaseSchema().tableFor(record.tableId)`. If the foreign table's schema 
is not yet present in `RelationAwarePostgresSchema`'s lazy cache, the lookup 
returns null and `ChunkUtils.getSplitColumn` throws NPE on 
`null.primaryKeyColumns()`:
   
   ```
   java.lang.NullPointerException: Cannot invoke 
"io.debezium.relational.Table.primaryKeyColumns()" because "table" is null
       at 
org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils.getSplitColumn(ChunkUtils.java:45)
       at 
org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext.getSplitType(PostgresSourceFetchTaskContext.java:291)
       at 
org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext.isRecordBetween(JdbcSourceFetchTaskContext.java:76)
       at 
org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.isChangeRecordInChunkRange(IncrementalSourceScanFetcher.java:265)
   ```
   
   The streaming reader (`IncrementalSourceStreamFetcher#shouldEmit`) already 
guards against this case via `finishedSplitsInfo.containsKey(tableId)` before 
invoking `isRecordBetween`. The scan fetcher was missing the symmetric guard.
   
   ## Fix
   
   Filter change records by tableId in 
`IncrementalSourceScanFetcher#isChangeRecordInChunkRange` before delegating to 
`isRecordBetween`. Records whose tableId does not equal 
`currentSnapshotSplit.getTableId()` are skipped.
   
   ## Test plan
   
   - [x] Added `IncrementalSourceScanFetcherTest` with three cases:
     - foreign-table change record is filtered out (no NPE, `isRecordBetween` 
is never invoked)
     - same-table change record is still delegated to `isRecordBetween`
     - non-data change records (watermark / signal) short-circuit without 
calling `getTableId`
   - [x] All existing `flink-cdc-base` unit tests still pass (16/16)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to