Yanquan Lv created FLINK-38835:
----------------------------------
Summary: When using postgresql cdc to read timestamp data with a
value of 1900-01-01, abnormal data occurred
Key: FLINK-38835
URL: https://issues.apache.org/jira/browse/FLINK-38835
Project: Flink
Issue Type: Improvement
Components: Flink CDC
Affects Versions: cdc-3.5.0
Reporter: Yanquan Lv
How to reproduce:
1) YAML job content
{code:java}
source:
type: postgres
hostname: xx
port: xx
username: xx
password: xx
tables: vvr_itcase.public.all_type_e2e_sink_1736305915787
decoding.plugin.name: pgoutput
slot.name: pgtest
sink:
type: values {code}
2) Table structure
{code:java}
CREATE TABLE public.all_type_e2e_sink_1736305915787
(id int4 NOT NULL,b bool NULL,c numeric NULL,d numeric NULL,e varchar NULL,f
timestamp NULL,PRIMARY KEY (id));
INSERT INTO public.all_type_e2e_sink_1736305915787(id, b, c, d, e, f)VALUES(10,
false, 10, 10, '1', '1900-01-01 00:00:00.000');{code}
Then start running this YAML job.
The result output during the snapshot phase:
{code:java}
CreateTableEvent{tableId=public.all_type_e2e_sink_1736305915787,
schema=columns={`id` INT NOT NULL,`b` BOOLEAN,`c` DECIMAL(38, 0),`d`
DECIMAL(38, 0),`e` STRING,`f` TIMESTAMP(6)}, primaryKeys=id, options=()}
DataChangeEvent{tableId=public.all_type_e2e_sink_1736305915787, before=[],
after=[10, false, 2560, 2560, 1, 1900-01-01T00:05:43], op=INSERT, meta=()}
{code}
then update a record, and the result output during the incremental phase:
{code:java}
DataChangeEvent{tableId=public.all_type_e2e_sink_1736305915787, before=[10,
false, 2560, 2560, 1, 1900-01-01T00:00], after=[10, true, 2560, 2560, 1,
1900-01-01T00:00], op=UPDATE, meta=()} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)