cansakiroglu opened a new pull request, #4437: URL: https://github.com/apache/flink-cdc/pull/4437
FLINK-38647 reports a NullPointerException from the Postgres Pipeline source when a captured table has REPLICA IDENTITY DEFAULT and an UPDATE arrives. Root cause: PostgresDataSource hardcodes DebeziumChangelogMode.ALL, which makes the deserializer extract a before-image that is null under DEFAULT replication, hence NPE'ing. This change fixes the issue from two angles: 1. Connector side: expose 'changelog-mode' YAML option on the Postgres Pipeline connector. Accepts 'all' (default, current behaviour) or 'upsert'. In upsert mode the source emits UPDATE events with before == null and only after populated, so the pipeline runs cleanly under REPLICA IDENTITY DEFAULT without requiring FULL (which increases WAL volume). 2. Runtime side: make serializer null-tolerant. Three changes: - copy() null-guards each recordDataSerializer.copy() call so the chained CopyingChainingOutput.pushToOperator path stops NPE'ing on null before/after. - serialize() writes 2 leading boolean presence flags before conditionally writing each record. Required because the previous serialize() already skipped writing null fields, but deserialize() always tried to read two records back. - deserialize() reads the 2 flag bytes and skips the corresponding read when absent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
