[
https://issues.apache.org/jira/browse/FLINK-39748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39748:
-----------------------------------
Labels: pull-request-available (was: )
> Postgres CDC snapshot produces wrong values for TIMESTAMP / TIMESTAMPTZ /
> DATE columns with historical dates
> ------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39748
> URL: https://issues.apache.org/jira/browse/FLINK-39748
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.6.0
> Reporter: Di Wu
> Priority: Major
> Labels: pull-request-available
>
> *Description*
>
> *Problem*
>
> When the Postgres CDC source snapshots rows containing TIMESTAMP,
> TIMESTAMPTZ, or DATE column values dated
> before the Julian/Gregorian cutover (1582-10-15), the emitted Debezium
> records carry numerically wrong
> timestamps. The same row, when later re-emitted via the streaming (logical
> decoding) path, carries the correct
> value — so snapshot and streaming records for the same primary key
> disagree, breaking idempotent UPSERT
> downstream and causing phantom updates.
>
> Example: snapshotting a row whose TIMESTAMP column holds '0001-01-01
> 00:00:00' produces a Debezium
> MicroTimestamp of -62135769257000000 instead of the proleptic-UTC-correct
> -62135596800000000 — a 2-day +
> 343-second drift on a JVM configured with Asia/Shanghai.
>
> *Root* *Cause*
>
> PostgresScanFetchTask#createDataEventsForTable reads column values via a
> bare rs.getObject(i + 1). For temporal
> columns, the PG JDBC driver returns java.sql.Timestamp / java.sql.Date
> constructed through
> java.util.GregorianCalendar (default Julian/Gregorian cutover at
> 1582-10-15) using the JVM default time zone.
> This introduces two independent drifts:
>
> 1. *Julian/Proleptic-Gregorian cutover* — values before 1582-10-15 are
> interpreted as Julian, while PostgreSQL
> stores proleptic Gregorian. Shift is N days depending on year (2 days at
> year 0001).
> 2. *Local Mean Time (LMT)* — JVMs with an LMT segment in their default time
> zone (e.g. Asia/Shanghai is LMT
> +08:05:43 until 1901-01-01) further offset by the LMT delta (343 seconds
> for Shanghai).
>
> The streaming path goes through Postgres logical decoding, which does not
> pass through GregorianCalendar, so
> the bug is silent until snapshot and streaming records for the same row are
> compared.
>
> Additionally, PostgresScanFetchTask calling rs.getObject directly bypasses
> the per-type dispatch in
> PostgresConnection#getColumnValue (which already handles MONEY / BIT /
> NUMERIC / TIME / TIMETZ correctly),
> unlike Debezium's own
> RelationalSnapshotChangeEventSource#createDataEventsForTable which delegates
> to
> jdbcConnection.rowToArray → getColumnValue.
>
> *Reproduction*
>
> 1. Create a Postgres table with a TIMESTAMP column.
> 2. Insert '0001-01-01 00:00:00'.
> 3. Start a Flink CDC Postgres source.
> 4. Compare the snapshot record's MicroTimestamp value with the same row
> re-captured via logical decoding (e.g.
> by updating it and re-reading).
> 5. The two micro values differ by 2 days − 343 seconds (on Asia/Shanghai
> JVM) or 2 days (on UTC JVM).
>
> *Proposed Fix*
>
> 1. In PostgresScanFetchTask, replace rs.getObject(i + 1) with
> jdbcConnection.getColumnValue(rs, i + 1, column,
> table, databaseSchema) so the snapshot path goes through the same per-type
> dispatch already used by Debezium's
> snapshot framework.
> 2. In PostgresConnection#getColumnValue, extend the existing switch
> (type.getOid()) to also handle
> PgOid.TIMESTAMP / TIMESTAMPTZ / DATE, reading them as
> java.time.LocalDateTime / OffsetDateTime / LocalDate via
> rs.getObject(columnIndex, ...class). This bypasses the legacy
> GregorianCalendar path while preserving the
> existing ±infinity sentinel contract by mapping LocalDateTime.MAX/MIN and
> OffsetDateTime.MAX/MIN back to
> Timestamp(Long.MAX_VALUE / Long.MIN_VALUE).
>
> *Scope*
>
> This issue is Postgres-only. MySQL's snapshot path uses its own per-type
> dispatch
> (MySqlSnapshotSplitReadTask#readField) and the affected value range
> (DATETIME 1000-9999) does not reach the
> Julian cutover. Oracle / Db2 / SqlServer use jdbcConnection.rowToArray
> whose default getColumnValue is also
> rs.getObject, but their column-value handling is tracked separately if
> symptoms are observed there.
>
> *Test Plan*
>
> Add a regression test in PostgresScanFetchTaskTest that snapshots a fixture
> table containing boundary dates
> (0001-01-01, 1582-10-04, 1582-10-15, 1900-12-31, 1901-01-02, and a value
> with microsecond precision) and
> asserts that the emitted Debezium record fields match the proleptic-UTC
> expectation.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)