[
https://issues.apache.org/jira/browse/FLINK-39633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39633:
-----------------------------------
Labels: pull-request-available (was: )
> PostgreSQL CDC backfill throws NullPointerException when WAL stream carries
> records for other captured tables
> -------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39633
> URL: https://issues.apache.org/jira/browse/FLINK-39633
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.6.0
> Reporter: Di Wu
> Priority: Major
> Labels: pull-request-available
>
> During the snapshot backfill phase of PostgreSQL CDC (incremental snapshot),
> the WAL
> stream from the logical replication slot may carry change records for
> captured tables
> other than the one whose chunk is currently being snapshotted.
>
> {\{IncrementalSourceScanFetcher#isChangeRecordInChunkRange}} does not
> filter records by
> tableId before delegating to
> \{{JdbcSourceFetchTaskContext#isRecordBetween}}, which calls
>
> {\{getDatabaseSchema().tableFor(record.tableId)}}. If that table's schema
> is not yet
> present in \{{RelationAwarePostgresSchema}}'s cache, the lookup returns
> null and
> {\{ChunkUtils.getSplitColumn}} invokes \{{primaryKeyColumns()}} on null,
> throwing NPE and
> aborting the snapshot split.
>
>
>
> h2. Stack trace
>
>
> {noformat}
> java.lang.NullPointerException: Cannot invoke
> "io.debezium.relational.Table.primaryKeyColumns()" because "table" is null
> at
> org.apache.flink.cdc.connectors.postgres.source.utils.ChunkUtils.getSplitColumn(ChunkUtils.java:45)
>
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext.getSplitType(PostgresSourceFetch
> TaskContext.java:291)
>
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext.isRecordBetween(JdbcSourceFetc
> hTaskContext.java:76)
>
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.isChangeRecordInChunkRange(I
> ncrementalSourceScanFetcher.java:265)
>
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollWithBuffer(IncrementalSo
> urceScanFetcher.java:182)
>
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(Incremental
> SourceScanFetcher.java:122)
>
> {noformat}
>
>
> h2. Root cause
>
> The PostgreSQL logical replication slot streams WAL changes for *all*
> tables in the
> publication, regardless of which snapshot split is currently being
> processed. During
> backfill (between low and high watermark), \{{pollWithBuffer}} iterates
> change events and
> calls \{{isChangeRecordInChunkRange}} on each. The current implementation in
> {\{flink-cdc-base}} is:
>
>
> {code:java}
>
> private boolean isChangeRecordInChunkRange(SourceRecord record) {
> if (taskContext.isDataChangeRecord(record)) {
>
> return taskContext.isRecordBetween(
> record,
>
> currentSnapshotSplit.getSplitStart(),
> currentSnapshotSplit.getSplitEnd());
>
> }
> return false;
>
> }
> {code}
>
> It does not verify that the record's tableId matches
>
> {\{currentSnapshotSplit.getTableId()}}. When a record for another captured
> table flows
> through:
>
>
> * \{{JdbcSourceFetchTaskContext#getSplitKey}} calls
>
> {\{getDatabaseSchema().tableFor(record.tableId)}};
> * \{{RelationAwarePostgresSchema}} loads relations lazily from
> \{{Relation}} messages, so
> for a table whose schema has not yet been observed the lookup returns
> null;
> * \{{ChunkUtils.getSplitColumn(null, ...)}} calls
> \{{null.primaryKeyColumns()}} -> NPE.
>
>
> Even when the schema happens to be cached, comparing a record from table B
> against chunk
> bounds that were derived from table A's primary key is semantically
> incorrect and would
> silently pollute the output buffer.
>
>
> For contrast, the streaming reader
>
> (\{{IncrementalSourceStreamFetcher#shouldEmit}}) already defends against
> this case via
> {\{finishedSplitsInfo.containsKey(tableId)}} before invoking
> \{{isRecordBetween}}. The scan
> fetcher is missing the symmetric tableId guard.
>
>
> h2. Reproduction
>
>
> * PostgreSQL source with incremental snapshot enabled (default) and
> {\{scan.incremental.snapshot.backfill.skip = false}}.
> * Two or more tables included in the publication, e.g. \{{public.table_a}}
> and
> {\{public.table_b}}.
> * While Flink CDC is snapshotting \{{table_a}}, generate INSERT/UPDATE
> traffic on
> {\{table_b}} so its WAL records arrive within \{{table_a}}'s backfill
> window.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)