Yanquan Lv created FLINK-38835:
----------------------------------

             Summary: When using postgresql cdc to read timestamp data with a 
value of 1900-01-01, abnormal data occurred
                 Key: FLINK-38835
                 URL: https://issues.apache.org/jira/browse/FLINK-38835
             Project: Flink
          Issue Type: Improvement
          Components: Flink CDC
    Affects Versions: cdc-3.5.0
            Reporter: Yanquan Lv


How to reproduce:
1) YAML job content
{code:java}
source:
  type: postgres
  hostname: xx
  port: xx
  username: xx
  password: xx
  tables: vvr_itcase.public.all_type_e2e_sink_1736305915787
  decoding.plugin.name: pgoutput
  slot.name: pgtest


sink:
  type: values {code}
2) Table structure
{code:java}
CREATE TABLE public.all_type_e2e_sink_1736305915787
 (id int4 NOT NULL,b bool NULL,c numeric NULL,d numeric NULL,e varchar NULL,f 
timestamp NULL,PRIMARY KEY (id));   


INSERT INTO public.all_type_e2e_sink_1736305915787(id, b, c, d, e, f)VALUES(10, 
false, 10, 10, '1', '1900-01-01 00:00:00.000');{code}
Then start running this YAML job.

The result output during the snapshot phase:
{code:java}
CreateTableEvent{tableId=public.all_type_e2e_sink_1736305915787, 
schema=columns={`id` INT NOT NULL,`b` BOOLEAN,`c` DECIMAL(38, 0),`d` 
DECIMAL(38, 0),`e` STRING,`f` TIMESTAMP(6)}, primaryKeys=id, options=()}

DataChangeEvent{tableId=public.all_type_e2e_sink_1736305915787, before=[], 
after=[10, false, 2560, 2560, 1, 1900-01-01T00:05:43], op=INSERT, meta=()} 
{code}
then update a record, and the result output during the incremental phase:
{code:java}
DataChangeEvent{tableId=public.all_type_e2e_sink_1736305915787, before=[10, 
false, 2560, 2560, 1, 1900-01-01T00:00], after=[10, true, 2560, 2560, 1, 
1900-01-01T00:00], op=UPDATE, meta=()} {code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to