JNSimba opened a new pull request, #4412:
URL: https://github.com/apache/flink-cdc/pull/4412

   This closes FLINK-39748.
   
   The Postgres CDC snapshot path reads column values via a bare 
`rs.getObject(i + 1)` in `PostgresScanFetchTask`. For TIMESTAMP / TIMESTAMPTZ / 
DATE columns, the PG JDBC driver constructs the returned `java.sql.Timestamp` / 
`java.sql.Date` through `GregorianCalendar` (default Julian/Gregorian cutover 
at 1582-10-15) using the JVM default time zone. This makes pre-cutover dates 
drift by N days (e.g. `0001-01-01` by 2 days), and also adds an LMT delta on 
JVMs whose default zone has an LMT segment (e.g. `Asia/Shanghai` is `+08:05:43` 
until 1901, vs `+08:00:00` after).
   
   The Postgres logical decoding (streaming) path does not pass through 
`GregorianCalendar`, so the same row produces different Debezium records on 
snapshot vs streaming, breaking idempotent UPSERT semantics for downstream 
sinks.
   
   This patch:
   
   1. Routes the snapshot path through `PostgresConnection.getColumnValue`, 
which already does per-type dispatch for `MONEY` / `BIT` / `NUMERIC` / `TIME` / 
`TIMETZ`, by replacing the bare `rs.getObject(i + 1)` in 
`PostgresScanFetchTask.createDataEventsForTable` with 
`jdbcConnection.getColumnValue(rs, i + 1, column, table, databaseSchema)`. This 
mirrors how Debezium's own `RelationalSnapshotChangeEventSource` reads rows.
   
   2. Extends the switch in `PostgresConnection.getColumnValue` with three new 
cases for `PgOid.TIMESTAMP` / `TIMESTAMPTZ` / `DATE`, reading the columns as 
`java.time.LocalDateTime` / `OffsetDateTime` / `LocalDate` via 
`rs.getObject(columnIndex, ...class)`. This bypasses `GregorianCalendar`. PG 
`+/-infinity` sentinels are preserved as `Timestamp(Long.MAX/MIN_VALUE)` to 
keep the existing downstream contract.
   
   A regression test in `PostgresScanFetchTaskTest` snapshots boundary dates 
(`0001-01-01`, `1582-10-04`, `1582-10-15`, `1900-12-31`, `1901-01-02`, and a 
microsecond-precision value) for TIMESTAMP and DATE columns and asserts the 
produced Debezium record values match the proleptic-UTC expectation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to