Di Wu created FLINK-39748:
-----------------------------

             Summary: Postgres CDC snapshot produces wrong values for TIMESTAMP 
/ TIMESTAMPTZ / DATE columns with historical dates
                 Key: FLINK-39748
                 URL: https://issues.apache.org/jira/browse/FLINK-39748
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
    Affects Versions: cdc-3.6.0
            Reporter: Di Wu


*Description*

  

  *Problem*

 

  When the Postgres CDC source snapshots rows containing TIMESTAMP, 
TIMESTAMPTZ, or DATE column values dated

  before the Julian/Gregorian cutover (1582-10-15), the emitted Debezium 
records carry numerically wrong

  timestamps. The same row, when later re-emitted via the streaming (logical 
decoding) path, carries the correct

  value — so snapshot and streaming records for the same primary key disagree, 
breaking idempotent UPSERT

  downstream and causing phantom updates.

 

  Example: snapshotting a row whose TIMESTAMP column holds '0001-01-01 
00:00:00' produces a Debezium

  MicroTimestamp of -62135769257000000 instead of the proleptic-UTC-correct 
-62135596800000000 — a 2-day +

  343-second drift on a JVM configured with Asia/Shanghai.

  

  *Root* *Cause*

 

  PostgresScanFetchTask#createDataEventsForTable reads column values via a bare 
rs.getObject(i + 1). For temporal

   columns, the PG JDBC driver returns java.sql.Timestamp / java.sql.Date 
constructed through

  java.util.GregorianCalendar (default Julian/Gregorian cutover at 1582-10-15) 
using the JVM default time zone.

  This introduces two independent drifts:                                    

 

  1. *Julian/Proleptic-Gregorian cutover* — values before 1582-10-15 are 
interpreted as Julian, while PostgreSQL

  stores proleptic Gregorian. Shift is N days depending on year (2 days at year 
0001).

  2. *Local Mean Time (LMT)* — JVMs with an LMT segment in their default time 
zone (e.g. Asia/Shanghai is LMT

  +08:05:43 until 1901-01-01) further offset by the LMT delta (343 seconds for 
Shanghai).

 

  The streaming path goes through Postgres logical decoding, which does not 
pass through GregorianCalendar, so

  the bug is silent until snapshot and streaming records for the same row are 
compared.

 

  Additionally, PostgresScanFetchTask calling rs.getObject directly bypasses 
the per-type dispatch in

  PostgresConnection#getColumnValue (which already handles MONEY / BIT / 
NUMERIC / TIME / TIMETZ correctly),

  unlike Debezium's own 
RelationalSnapshotChangeEventSource#createDataEventsForTable which delegates to

  jdbcConnection.rowToArray → getColumnValue.                                

 

  *Reproduction*

 

  1. Create a Postgres table with a TIMESTAMP column.

  2. Insert '0001-01-01 00:00:00'.

  3. Start a Flink CDC Postgres source.

  4. Compare the snapshot record's MicroTimestamp value with the same row 
re-captured via logical decoding (e.g.

  by updating it and re-reading).

  5. The two micro values differ by 2 days − 343 seconds (on Asia/Shanghai JVM) 
or 2 days (on UTC JVM).

 

  *Proposed Fix*

 

  1. In PostgresScanFetchTask, replace rs.getObject(i + 1) with 
jdbcConnection.getColumnValue(rs, i + 1, column, 

  table, databaseSchema) so the snapshot path goes through the same per-type 
dispatch already used by Debezium's

  snapshot framework.

  2. In PostgresConnection#getColumnValue, extend the existing switch 
(type.getOid()) to also handle

  PgOid.TIMESTAMP / TIMESTAMPTZ / DATE, reading them as java.time.LocalDateTime 
/ OffsetDateTime / LocalDate via

  rs.getObject(columnIndex, ...class). This bypasses the legacy 
GregorianCalendar path while preserving the

  existing ±infinity sentinel contract by mapping LocalDateTime.MAX/MIN and 
OffsetDateTime.MAX/MIN back to

  Timestamp(Long.MAX_VALUE / Long.MIN_VALUE).                                

 

  *Scope*

 

  This issue is Postgres-only. MySQL's snapshot path uses its own per-type 
dispatch

  (MySqlSnapshotSplitReadTask#readField) and the affected value range (DATETIME 
1000-9999) does not reach the

  Julian cutover. Oracle / Db2 / SqlServer use jdbcConnection.rowToArray whose 
default getColumnValue is also

  rs.getObject, but their column-value handling is tracked separately if 
symptoms are observed there.

 

  *Test Plan*

 

  Add a regression test in PostgresScanFetchTaskTest that snapshots a fixture 
table containing boundary dates

  (0001-01-01, 1582-10-04, 1582-10-15, 1900-12-31, 1901-01-02, and a value with 
microsecond precision) and

  asserts that the emitted Debezium record fields match the proleptic-UTC 
expectation.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to