A S Rakesh Krishna created FLINK-38742:
------------------------------------------
Summary: Postgres CDC pipeline fails when there is column of type
TIMESTAMPTZ (timestamp with timezone)
Key: FLINK-38742
URL: https://issues.apache.org/jira/browse/FLINK-38742
Project: Flink
Issue Type: Bug
Components: Flink CDC
Affects Versions: 1.20.3
Environment: Flink-kubernetes-operator - 1.13
Flink - 1.20.3
Flink-cdc - 3.5.0
```
FROM apache/flink:1.20.3-java17
# Set working directory
WORKDIR /opt/flink
# Install required dependencies
USER root
# Install Python for Flink Python API (if needed)
RUN apt-get update && \
apt-get install -y python3 python3-pip && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Create directories for custom JARs
RUN mkdir -p /opt/flink/usrlib && \
chown -R flink:flink /opt/flink/usrlib
ADD *.jar $FLINK_HOME/lib/
ADD flink-cdc-3.5.0-bin.tar.gz $FLINK_HOME/
RUN mv $FLINK_HOME/flink-cdc-3.5.0/lib/flink-cdc-dist-3.5.0.jar $FLINK_HOME/lib/
# Switch to flink user
USER flink
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.5.0/flink-cdc-pipeline-connector-mysql-3.5.0.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-postgres/3.5.0/flink-cdc-pipeline-connector-postgres-3.5.0.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-iceberg/3.5.0/flink-cdc-pipeline-connector-iceberg-3.5.0.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.5.0/flink-sql-connector-mysql-cdc-3.5.0.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-postgres-cdc/3.5.0/flink-sql-connector-postgres-cdc-3.5.0.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mongodb-cdc/3.5.0/flink-sql-connector-mongodb-cdc-3.5.0.jar
# MySQL Connector
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar
# Iceberg Flink Runtime (using 1.19 for Flink 1.20 compatibility)
RUN wget -P /opt/flink/lib/ \
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.20/1.9.2/iceberg-flink-runtime-1.20-1.9.2.jar
# Hive Metastore Dependencies
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/apache/hive/hive-metastore/3.1.3/hive-metastore-3.1.3.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar
RUN wget -P /opt/flink/lib/ \
https://jdbc.postgresql.org/download/postgresql-42.6.0.jar
# AWS/Hadoop Dependencies (from /hive-lib/aws/)
RUN wget -P /opt/flink/lib/ \
https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-aws/3.4.2/hadoop-aws-3.4.2.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.29.52/bundle-2.29.52.jar
RUN wget -P /opt/flink/lib/ \
https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-common/3.4.2/hadoop-common-3.4.2.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/wildfly/openssl/wildfly-openssl/1.0.7.Final/wildfly-openssl-1.0.7.Final.jar
# Commons Dependencies (from /hive-lib/commons/)
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/commons-configuration/commons-configuration/1.10/commons-configuration-1.10.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/commons-lang/commons-lang/2.6/commons-lang-2.6.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/commons-collections/commons-collections/3.2.2/commons-collections-3.2.2.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar
#RUN #wget -P /opt/flink/lib/ \
#
https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/com/fasterxml/woodstox/woodstox-core/7.1.1/woodstox-core-7.1.1.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/codehaus/woodstox/stax2-api/4.2.2/stax2-api-4.2.2.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.4.2/hadoop-client-runtime-3.4.2.jar
# Corrected S3 deployment using the proper version (1.20.3)
RUN mkdir -p $FLINK_HOME/plugins/s3-fs-hadoop
RUN wget -P $FLINK_HOME/plugins/s3-fs-hadoop/ \
https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.20.3/flink-s3-fs-hadoop-1.20.3.jar
RUN mkdir -p $FLINK_HOME/plugins/s3-fs-presto
RUN wget -P $FLINK_HOME/plugins/s3-fs-presto/ \
https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-presto/1.20.3/flink-s3-fs-presto-1.20.3.jar
#RUN wget -P /opt/flink/lib/ \
#
https://repo1.maven.org/maven2/org/apache/flink/flink-s3-fs-hadoop/1.20.3/flink-s3-fs-hadoop-1.20.3.jar
RUN wget -P /opt/flink/lib/ \
https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-core/3.4.2/hadoop-mapreduce-client-core-3.4.2.jar
RUN wget -P /opt/flink/lib/ \
https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-common/3.4.2/hadoop-mapreduce-client-common-3.4.2.jar
RUN wget -P /opt/flink/lib/ \
https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-mapreduce-client-jobclient/3.4.2/hadoop-mapreduce-client-jobclient-3.4.2.jar
RUN wget -P /opt/flink/lib/ \
https://repo1.maven.org/maven2/org/apache/commons/commons-configuration2/2.9.0/commons-configuration2-2.9.0.jar
RUN wget -P /opt/flink/lib/ \
https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-hdfs/3.4.2/hadoop-hdfs-3.4.2.jar
\
https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs-client/3.4.2/hadoop-hdfs-client-3.4.2.jar
RUN wget -P /opt/flink/lib/ \
https://repo.maven.apache.org/maven2/org/apache/commons/commons-collections4/4.5.0/commons-collections4-4.5.0.jar
RUN wget -P /opt/flink/lib/ \
https://repo.maven.apache.org/maven2/org/apache/hadoop/hadoop-auth/3.4.2/hadoop-auth-3.4.2.jar
RUN wget -P /opt/flink/lib/ \
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-hive-runtime/1.7.2/iceberg-hive-runtime-1.7.2.jar
RUN wget -P /opt/flink/lib/ \
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-hive-metastore/1.9.2/iceberg-hive-metastore-1.9.2.jar
\
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-hive/0.9.1/iceberg-hive-0.9.1.jar
\
https://repo.maven.apache.org/maven2/org/apache/thrift/libthrift/0.9.3/libthrift-0.9.3.jar
\
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-hive3/1.7.2/iceberg-hive3-1.7.2.jar
\
https://repo1.maven.org/maven2/org/apache/flink/flink-core/1.20.3/flink-core-1.20.3.jar
\
https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-aws/1.9.2/iceberg-aws-1.9.2.jar
ENV FLINK_HOME=/opt/flink
ENV PATH=$FLINK_HOME/bin:$PATH
# Healthcheck
HEALTHCHECK --interval=30s --timeout=10s --retries=3 \
CMD curl -f http://localhost:8081/overview || exit 1
WORKDIR /opt/flink
```
Reporter: A S Rakesh Krishna
In a postgres db if a table has a column of type TIMESTAMPTZ.
postgres-cdc-pipeline is failing with an error.
```
org.apache.flink.cdc.runtime.operators.sink.exception.SinkWrapperException:
Failed to handle event DataChangeEvent\{tableId=public.guest_messages,
before=null,
after=org.apache.flink.cdc.common.data.binary.BinaryRecordData@d8129cef,
op=INSERT, meta=()} in DataSink wrapper.
at
org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator.processElement(DataSinkWriterOperator.java:185)
~[flink-cdc-dist-3.5.0.jar:3.5.0]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
~[flink-dist-1.20.3.jar:1.20.3]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
~[flink-dist-1.20.3.jar:1.20.3]
at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
~[flink-dist-1.20.3.jar:1.20.3]
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
~[flink-dist-1.20.3.jar:1.20.3]
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
~[flink-dist-1.20.3.jar:1.20.3]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
~[flink-dist-1.20.3.jar:1.20.3]
at
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
~[flink-dist-1.20.3.jar:1.20.3]
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
~[flink-dist-1.20.3.jar:1.20.3]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
~[flink-dist-1.20.3.jar:1.20.3]
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
~[flink-dist-1.20.3.jar:1.20.3]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
~[flink-dist-1.20.3.jar:1.20.3]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
~[flink-dist-1.20.3.jar:1.20.3]
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
~[flink-dist-1.20.3.jar:1.20.3]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
~[flink-dist-1.20.3.jar:1.20.3]
at java.base/java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.NumberFormatException: For input string:
"�ݩ���\{"message": "Hello! How can I assist you today?", "type": "text"}
at java.base/java.lang.NumberFormatException.forInputString(Unknown Source)
~[?:?]
at java.base/java.lang.Long.parseLong(Unknown Source) ~[?:?]
at java.base/java.lang.Long.parseLong(Unknown Source) ~[?:?]
at
org.apache.flink.cdc.common.data.binary.BinaryRecordData.getZonedTimestamp(BinaryRecordData.java:182)
~[flink-cdc-dist-3.5.0.jar:3.5.0]
at
org.apache.flink.cdc.connectors.iceberg.sink.utils.IcebergTypeUtils.lambda$createFieldGetter$56253627$3(IcebergTypeUtils.java:184)
~[flink-cdc-pipeline-connector-iceberg-3.5.0.jar:3.5.0]
at
org.apache.flink.cdc.connectors.iceberg.sink.utils.RowDataUtils.convertDataChangeEventToRowData(RowDataUtils.java:57)
~[flink-cdc-pipeline-connector-iceberg-3.5.0.jar:3.5.0]
at
org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergWriter.write(IcebergWriter.java:128)
~[flink-cdc-pipeline-connector-iceberg-3.5.0.jar:3.5.0]
at
org.apache.flink.cdc.connectors.iceberg.sink.v2.IcebergWriter.write(IcebergWriter.java:54)
~[flink-cdc-pipeline-connector-iceberg-3.5.0.jar:3.5.0]
at
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:166)
~[flink-dist-1.20.3.jar:1.20.3]
at
org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperator.processElement(DataSinkWriterOperator.java:183)
~[flink-cdc-dist-3.5.0.jar:3.5.0]
... 17 more
2025-11-24 10:34:24,977 INFO org.apache.flink.runtime.jobmaster.JobMaster [] -
6 tasks will be restarted to recover the failed task
7bcc05f0c397463a6b3a8efd8a997b67_0deb1b26a3d9eb3c8f0c11f7110b2903_1_0.
2025-11-24 10:34:24,977 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Sync Postgres
Database to Iceberg (16297cf52307ec4ddb514ce5dd0af71f) switched from state
RUNNING to RESTARTING.
2025-11-24 10:34:24,980 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - PostPartition ->
Sink Writer: Iceberg Sink (1/2)
(7bcc05f0c397463a6b3a8efd8a997b67_0deb1b26a3d9eb3c8f0c11f7110b2903_0_0)
switched from RUNNING to CANCELING.
2025-11-24 10:34:24,982 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink Committer:
Iceberg Sink (1/2)
(7bcc05f0c397463a6b3a8efd8a997b67_26351f8267c5887c12c827914f3a91a9_0_0)
switched from RUNNING to CANCELING.
2025-11-24 10:34:24,982 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink Committer:
Iceberg Sink (2/2)
(7bcc05f0c397463a6b3a8efd8a997b67_26351f8267c5887c12c827914f3a91a9_1_0)
switched from RUNNING to CANCELING.
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)