Avichay Marciano created FLINK-39415:
----------------------------------------
Summary: [flink-cdc-postgres] TIMESTAMPTZ type mapping causes
NumberFormatException in Pipeline connector with Iceberg sink
Key: FLINK-39415
URL: https://issues.apache.org/jira/browse/FLINK-39415
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: cdc-3.6.0
Environment: Validated end-to-end on EKS with Flink 1.20 + CDC 3.6.0:
TIMESTAMPTZ data successfully written to Apache Iceberg (S3 Tables) with
correct microsecond precision and UTC timezone.
Reporter: Avichay Marciano
h3. Problem
When using the Flink CDC Pipeline connector (YAML mode) to replicate a
PostgreSQL table containing {{TIMESTAMPTZ}} columns to an Iceberg sink, the job
crashes with {{NumberFormatException}} during both snapshot and CDC phases.
h3. Root Cause
{{PostgresTypeUtils}} maps {{TIMESTAMPTZ}} ({{{}PgOid.TIMESTAMPTZ{}}}) to
{{ZonedTimestampType}} ({{{}TIMESTAMP_WITH_TIME_ZONE{}}}), but the Debezium
deserializer ({{{}DebeziumEventDeserializationSchema{}}}) only has a converter
for {{{}TIMESTAMP_WITH_LOCAL_TIME_ZONE{}}}. This type mismatch causes:
# The deserializer produces {{LocalZonedTimestampData}} for the field
# {{AbstractBinaryWriter.write()}} expects {{ZonedTimestampData}} based on the
declared type
# Binary data gets corrupted — field offset misalignment in
{{BinaryRecordData}}
# {{BinaryRecordData.getZonedTimestamp()}} reads garbage data, resulting in
{{NumberFormatException}}
The existing test {{PostgresFullTypesITCase}} already expects
{{DataTypes.TIMESTAMP_LTZ(0)}} for {{{}TIMESTAMPTZ{}}}, confirming the correct
type mapping should be {{{}TIMESTAMP_WITH_LOCAL_TIME_ZONE{}}}.
h3. Steps to Reproduce
{code:sql}
CREATE TABLE test_tz (
id INT PRIMARY KEY,
name TEXT,
created_at TIMESTAMPTZ DEFAULT now()
);
ALTER TABLE test_tz REPLICA IDENTITY FULL;
INSERT INTO test_tz VALUES (1, 'test', '2026-03-31 12:03:46.125062+00');
{code}
Configure a Flink CDC Pipeline YAML job with PostgreSQL source → Iceberg sink
targeting the above table. Submit the job — crashes immediately during snapshot:
{code:java}
java.lang.NumberFormatException
at BinaryRecordData.getZonedTimestamp()
at IcebergTypeUtils.createFieldGetter()
{code}
h3. Workaround
Use {{TIMESTAMP}} (without time zone) instead of {{{}TIMESTAMPTZ{}}}, or use
Flink SQL CDC mode instead of Pipeline YAML mode.
h3. Fix
The fix changes {{PostgresTypeUtils}} to map {{TIMESTAMPTZ}} →
{{DataTypes.TIMESTAMP_LTZ(scale)}} instead of {{{}ZonedTimestampType{}}}. This
aligns with the Debezium deserializer's existing converter and matches the
expectation in {{{}PostgresFullTypesITCase{}}}.
Additionally adds {{convertToZonedTimestamp()}} to
{{DebeziumEventDeserializationSchema}} for future {{TIMESTAMP_WITH_TIME_ZONE}}
support.
*Files changed:*
* {{{}PostgresTypeUtils.java{}}}: {{TIMESTAMPTZ}} → {{TIMESTAMP_LTZ(scale)}}
(was {{{}ZonedTimestampType{}}})
* {{{}DebeziumEventDeserializationSchema.java{}}}: add
{{convertToZonedTimestamp()}}
* New: {{PostgresTypeUtilsTimestamptzTest.java}} — validates type mapping
* {{IcebergTypeUtilsTest.java}} — validates field getter for {{TIMESTAMP_LTZ}}
Branch:
[https://github.com/avichaym/flink-cdc/tree/fix/timestamptz-iceberg-sink]
Validated end-to-end on EKS with Flink 1.20 + CDC 3.6.0: TIMESTAMPTZ data
successfully written to Apache Iceberg (S3 Tables) with correct microsecond
precision and UTC timezone.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)