cansakiroglu opened a new pull request, #4437:
URL: https://github.com/apache/flink-cdc/pull/4437

   FLINK-38647 reports a NullPointerException from the Postgres Pipeline source 
when a captured table has REPLICA IDENTITY DEFAULT and an UPDATE arrives. Root 
cause: PostgresDataSource hardcodes DebeziumChangelogMode.ALL, which makes the 
deserializer extract a before-image that is null under DEFAULT replication, 
hence NPE'ing.
   
   This change fixes the issue from two angles:
   
   1. Connector side: expose 'changelog-mode' YAML option on the Postgres 
Pipeline connector. Accepts 'all' (default, current behaviour) or 'upsert'. In 
upsert mode the source emits UPDATE events with before == null and only after 
populated, so the pipeline runs cleanly under REPLICA IDENTITY DEFAULT without 
requiring FULL (which increases WAL volume).
   
   2. Runtime side: make serializer null-tolerant. Three changes:
   
   - copy() null-guards each recordDataSerializer.copy() call so the chained 
CopyingChainingOutput.pushToOperator path stops NPE'ing on null before/after.
   - serialize() writes 2 leading boolean presence flags before conditionally 
writing each record. Required because the previous serialize() already skipped 
writing null fields, but deserialize() always tried to read two records back.
   - deserialize() reads the 2 flag bytes and skips the corresponding read when 
absent.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to