A S Rakesh Krishna created FLINK-38742:
------------------------------------------

             Summary: Postgres CDC pipeline fails when there is column of type 
TIMESTAMPTZ (timestamp with timezone)
                 Key: FLINK-38742
                 URL: https://issues.apache.org/jira/browse/FLINK-38742
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: 1.20.3
         Environment: Flink-kubernetes-operator - 1.13
Flink - 1.20.3
Flink-cdc - 3.5.0

```
FROM apache/flink:1.20.3-java17

# Set working directory
WORKDIR /opt/flink

# Install required dependencies
USER root

# Install Python for Flink Python API (if needed)
RUN apt-get update && \
    apt-get install -y python3 python3-pip && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# Create directories for custom JARs
RUN mkdir -p /opt/flink/usrlib && \
    chown -R flink:flink /opt/flink/usrlib

ADD *.jar $FLINK_HOME/lib/
ADD flink-cdc-3.5.0-bin.tar.gz $FLINK_HOME/
RUN mv $FLINK_HOME/flink-cdc-3.5.0/lib/flink-cdc-dist-3.5.0.jar $FLINK_HOME/lib/

# Switch to flink user
USER flink

RUN wget -P /opt/flink/lib/ \
      
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.5.0/flink-cdc-pipeline-connector-mysql-3.5.0.jar

RUN wget -P /opt/flink/lib/ \
      
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-postgres/3.5.0/flink-cdc-pipeline-connector-postgres-3.5.0.jar

RUN wget -P /opt/flink/lib/ \
      
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-iceberg/3.5.0/flink-cdc-pipeline-connector-iceberg-3.5.0.jar

RUN wget -P /opt/flink/lib/ \
      
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.5.0/flink-sql-connector-mysql-cdc-3.5.0.jar

RUN wget -P /opt/flink/lib/ \
      
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.5.0/flink-sql-connector-postgres-cdc-3.5.0.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mongodb-cdc/3.5.0/flink-sql-connector-mongodb-cdc-3.5.0.jar

# MySQL Connector
RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar

# Iceberg Flink Runtime (using 1.19 for Flink 1.20 compatibility)
RUN wget -P /opt/flink/lib/ \
    
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.20/1.9.2/iceberg-flink-runtime-1.20-1.9.2.jar

# Hive Metastore Dependencies
RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/org/apache/hive/hive-metastore/3.1.3/hive-metastore-3.1.3.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar

RUN wget -P /opt/flink/lib/ \
    https://jdbc.postgresql.org/download/postgresql-42.6.0.jar

# AWS/Hadoop Dependencies (from /hive-lib/aws/)
RUN wget -P /opt/flink/lib/ \
    
https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-aws/3.4.2/hadoop-aws-3.4.2.jar
RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.29.52/bundle-2.29.52.jar
RUN wget -P /opt/flink/lib/ \
    
https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-common/3.4.2/hadoop-common-3.4.2.jar
RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/org/wildfly/openssl/wildfly-openssl/1.0.7.Final/wildfly-openssl-1.0.7.Final.jar

# Commons Dependencies (from /hive-lib/commons/)
RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/commons-configuration/commons-configuration/1.10/commons-configuration-1.10.jar
RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/commons-lang/commons-lang/2.6/commons-lang-2.6.jar
RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar
RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar

#RUN #wget -P /opt/flink/lib/ \
#    
https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/com/fasterxml/woodstox/woodstox-core/7.1.1/woodstox-core-7.1.1.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/org/codehaus/woodstox/stax2-api/4.2.2/stax2-api-4.2.2.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.4.2/hadoop-client-runtime-3.4.2.jar

# Corrected S3 deployment using the proper version (1.20.3)
RUN mkdir -p $FLINK_HOME/plugins/s3-fs-hadoop
RUN wget -P $FLINK_HOME/plugins/s3-fs-hadoop/ \
    
https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.20.3/flink-s3-fs-hadoop-1.20.3.jar

RUN mkdir -p $FLINK_HOME/plugins/s3-fs-presto
RUN wget -P $FLINK_HOME/plugins/s3-fs-presto/ \
    
https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.3/flink-s3-fs-presto-1.20.3.jar

#RUN wget -P /opt/flink/lib/ \
#    
https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.20.3/flink-s3-fs-hadoop-1.20.3.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-core/3.4.2/hadoop-mapreduce-client-core-3.4.2.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-common/3.4.2/hadoop-mapreduce-client-common-3.4.2.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-jobclient/3.4.2/hadoop-mapreduce-client-jobclient-3.4.2.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo1.maven.org/maven2/org/apache/commons/commons-configuration2/2.9.0/commons-configuration2-2.9.0.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-hdfs/3.4.2/hadoop-hdfs-3.4.2.jar
 \
    
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs-client/3.4.2/hadoop-hdfs-client-3.4.2.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo.maven.apache.org/maven2/org/apache/commons/commons-collections4/4.5.0/commons-collections4-4.5.0.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-auth/3.4.2/hadoop-auth-3.4.2.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-hive-runtime/1.7.2/iceberg-hive-runtime-1.7.2.jar

RUN wget -P /opt/flink/lib/ \
    
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-hive-metastore/1.9.2/iceberg-hive-metastore-1.9.2.jar
 \
    
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-hive/0.9.1/iceberg-hive-0.9.1.jar
 \
    
https://repo.maven.apache.org/maven2/org/apache/thrift/libthrift/0.9.3/libthrift-0.9.3.jar
 \
    
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-hive3/1.7.2/iceberg-hive3-1.7.2.jar
 \
    
https://repo1.maven.org/maven2/org/apache/flink/flink-core/1.20.3/flink-core-1.20.3.jar
 \
    
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-aws/1.9.2/iceberg-aws-1.9.2.jar


ENV FLINK_HOME=/opt/flink
ENV PATH=$FLINK_HOME/bin:$PATH

# Healthcheck
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
    CMD curl -f http://localhost:8081/overview || exit 1

WORKDIR /opt/flink

```
            Reporter: A S Rakesh Krishna


In a postgres db if a table has a column of type TIMESTAMPTZ.
postgres-cdc-pipeline is failing with an error.

```

org.apache.flink.cdc.runtime.operators.sink.exception.SinkWrapperException: 
Failed to handle event DataChangeEvent\{tableId=public.guest_messages, 
before=null, 
after=org.apache.flink.cdc.common.data.binary.BinaryRecordData@d8129cef, 
op=INSERT, meta=()} in DataSink wrapper.

at 
org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator.processElement(DataSinkWriterOperator.java:185)
 ~[flink-cdc-dist-3.5.0.jar:3.5.0]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
 ~[flink-dist-1.20.3.jar:1.20.3]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
 ~[flink-dist-1.20.3.jar:1.20.3]

at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
 ~[flink-dist-1.20.3.jar:1.20.3]

at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-dist-1.20.3.jar:1.20.3]

at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
 ~[flink-dist-1.20.3.jar:1.20.3]

at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
 ~[flink-dist-1.20.3.jar:1.20.3]

at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
 ~[flink-dist-1.20.3.jar:1.20.3]

at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-dist-1.20.3.jar:1.20.3]

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
 ~[flink-dist-1.20.3.jar:1.20.3]

at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-dist-1.20.3.jar:1.20.3]

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
 ~[flink-dist-1.20.3.jar:1.20.3]

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) 
~[flink-dist-1.20.3.jar:1.20.3]

at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
 ~[flink-dist-1.20.3.jar:1.20.3]

at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) 
~[flink-dist-1.20.3.jar:1.20.3]

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) 
~[flink-dist-1.20.3.jar:1.20.3]

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) 
~[flink-dist-1.20.3.jar:1.20.3]

at java.base/java.lang.Thread.run(Unknown Source) ~[?:?]

Caused by: java.lang.NumberFormatException: For input string: 
"�ݩ���\{"message": "Hello! How can I assist you today?", "type": "text"}

 

at java.base/java.lang.NumberFormatException.forInputString(Unknown Source) 
~[?:?]

at java.base/java.lang.Long.parseLong(Unknown Source) ~[?:?]

at java.base/java.lang.Long.parseLong(Unknown Source) ~[?:?]

at 
org.apache.flink.cdc.common.data.binary.BinaryRecordData.getZonedTimestamp(BinaryRecordData.java:182)
 ~[flink-cdc-dist-3.5.0.jar:3.5.0]

at 
org.apache.flink.cdc.connectors.iceberg.sink.utils.IcebergTypeUtils.lambda$createFieldGetter$56253627$3(IcebergTypeUtils.java:184)
 ~[flink-cdc-pipeline-connector-iceberg-3.5.0.jar:3.5.0]

at 
org.apache.flink.cdc.connectors.iceberg.sink.utils.RowDataUtils.convertDataChangeEventToRowData(RowDataUtils.java:57)
 ~[flink-cdc-pipeline-connector-iceberg-3.5.0.jar:3.5.0]

at 
org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergWriter.write(IcebergWriter.java:128)
 ~[flink-cdc-pipeline-connector-iceberg-3.5.0.jar:3.5.0]

at 
org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergWriter.write(IcebergWriter.java:54)
 ~[flink-cdc-pipeline-connector-iceberg-3.5.0.jar:3.5.0]

at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:166)
 ~[flink-dist-1.20.3.jar:1.20.3]

at 
org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator.processElement(DataSinkWriterOperator.java:183)
 ~[flink-cdc-dist-3.5.0.jar:3.5.0]

... 17 more

2025-11-24 10:34:24,977 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 
6 tasks will be restarted to recover the failed task 
7bcc05f0c397463a6b3a8efd8a997b67_0deb1b26a3d9eb3c8f0c11f7110b2903_1_0.

2025-11-24 10:34:24,977 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Sync Postgres 
Database to Iceberg (16297cf52307ec4ddb514ce5dd0af71f) switched from state 
RUNNING to RESTARTING.

2025-11-24 10:34:24,980 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - PostPartition -> 
Sink Writer: Iceberg Sink (1/2) 
(7bcc05f0c397463a6b3a8efd8a997b67_0deb1b26a3d9eb3c8f0c11f7110b2903_0_0) 
switched from RUNNING to CANCELING.

2025-11-24 10:34:24,982 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink Committer: 
Iceberg Sink (1/2) 
(7bcc05f0c397463a6b3a8efd8a997b67_26351f8267c5887c12c827914f3a91a9_0_0) 
switched from RUNNING to CANCELING.

2025-11-24 10:34:24,982 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink Committer: 
Iceberg Sink (2/2) 
(7bcc05f0c397463a6b3a8efd8a997b67_26351f8267c5887c12c827914f3a91a9_1_0) 
switched from RUNNING to CANCELING.
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to