Avichay Marciano created FLINK-39415:
----------------------------------------

             Summary: [flink-cdc-postgres] TIMESTAMPTZ type mapping causes 
NumberFormatException in Pipeline connector with Iceberg sink
                 Key: FLINK-39415
                 URL: https://issues.apache.org/jira/browse/FLINK-39415
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: cdc-3.6.0
         Environment: Validated end-to-end on EKS with Flink 1.20 + CDC 3.6.0: 
TIMESTAMPTZ data successfully written to Apache Iceberg (S3 Tables) with 
correct microsecond precision and UTC timezone.
            Reporter: Avichay Marciano


h3. Problem

When using the Flink CDC Pipeline connector (YAML mode) to replicate a 
PostgreSQL table containing {{TIMESTAMPTZ}} columns to an Iceberg sink, the job 
crashes with {{NumberFormatException}} during both snapshot and CDC phases.
h3. Root Cause

{{PostgresTypeUtils}} maps {{TIMESTAMPTZ}} ({{{}PgOid.TIMESTAMPTZ{}}}) to 
{{ZonedTimestampType}} ({{{}TIMESTAMP_WITH_TIME_ZONE{}}}), but the Debezium 
deserializer ({{{}DebeziumEventDeserializationSchema{}}}) only has a converter 
for {{{}TIMESTAMP_WITH_LOCAL_TIME_ZONE{}}}. This type mismatch causes:
 # The deserializer produces {{LocalZonedTimestampData}} for the field
 # {{AbstractBinaryWriter.write()}} expects {{ZonedTimestampData}} based on the 
declared type
 # Binary data gets corrupted — field offset misalignment in 
{{BinaryRecordData}}
 # {{BinaryRecordData.getZonedTimestamp()}} reads garbage data, resulting in 
{{NumberFormatException}}

The existing test {{PostgresFullTypesITCase}}  already expects 
{{DataTypes.TIMESTAMP_LTZ(0)}} for {{{}TIMESTAMPTZ{}}}, confirming the correct 
type mapping should be {{{}TIMESTAMP_WITH_LOCAL_TIME_ZONE{}}}.
h3. Steps to Reproduce
{code:sql}
CREATE TABLE test_tz (
    id INT PRIMARY KEY,
    name TEXT,
    created_at TIMESTAMPTZ DEFAULT now()
);
ALTER TABLE test_tz REPLICA IDENTITY FULL;
INSERT INTO test_tz VALUES (1, 'test', '2026-03-31 12:03:46.125062+00');
{code}
Configure a Flink CDC Pipeline YAML job with PostgreSQL source → Iceberg sink 
targeting the above table. Submit the job — crashes immediately during snapshot:
{code:java}
java.lang.NumberFormatException
    at BinaryRecordData.getZonedTimestamp()
    at IcebergTypeUtils.createFieldGetter()
{code}
h3. Workaround

Use {{TIMESTAMP}} (without time zone) instead of {{{}TIMESTAMPTZ{}}}, or use 
Flink SQL CDC mode instead of Pipeline YAML mode.
h3. Fix

The fix changes {{PostgresTypeUtils}} to map {{TIMESTAMPTZ}} → 
{{DataTypes.TIMESTAMP_LTZ(scale)}} instead of {{{}ZonedTimestampType{}}}. This 
aligns with the Debezium deserializer's existing converter and matches the 
expectation in {{{}PostgresFullTypesITCase{}}}.

Additionally adds {{convertToZonedTimestamp()}} to 
{{DebeziumEventDeserializationSchema}} for future {{TIMESTAMP_WITH_TIME_ZONE}} 
support.

*Files changed:*
 * {{{}PostgresTypeUtils.java{}}}: {{TIMESTAMPTZ}} → {{TIMESTAMP_LTZ(scale)}} 
(was {{{}ZonedTimestampType{}}})
 * {{{}DebeziumEventDeserializationSchema.java{}}}: add 
{{convertToZonedTimestamp()}}
 * New: {{PostgresTypeUtilsTimestamptzTest.java}} — validates type mapping
 * {{IcebergTypeUtilsTest.java}} — validates field getter for {{TIMESTAMP_LTZ}}

Branch: 
[https://github.com/avichaym/flink-cdc/tree/fix/timestamptz-iceberg-sink]

Validated end-to-end on EKS with Flink 1.20 + CDC 3.6.0: TIMESTAMPTZ data 
successfully written to Apache Iceberg (S3 Tables) with correct microsecond 
precision and UTC timezone.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to