This is an automated email from the ASF dual-hosted git repository.

loserwang1024 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 26e6b73bd [FLINK-38742][cdc-pipeline/postgres] Fix TIMESTAMPTZ type 
mapping to TIMESTAMP_LTZ (#4181)
26e6b73bd is described below

commit 26e6b73bd4a4ab36d198e0d8e1beb55f843a0f0c
Author: A S Rakesh Krishna <[email protected]>
AuthorDate: Tue May 26 16:51:58 2026 +0530

    [FLINK-38742][cdc-pipeline/postgres] Fix TIMESTAMPTZ type mapping to 
TIMESTAMP_LTZ (#4181)
---
 .../postgres/utils/PostgresTypeUtils.java          |  5 ++---
 .../cdc/pipeline/tests/PostgresE2eITCase.java      | 24 +++++++++++-----------
 .../src/test/resources/ddl/postgres_inventory.sql  | 21 ++++++++++---------
 3 files changed, 25 insertions(+), 25 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
index 9362249c4..fcf017541 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
@@ -19,7 +19,6 @@ package org.apache.flink.cdc.connectors.postgres.utils;
 
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
-import org.apache.flink.cdc.common.types.ZonedTimestampType;
 import org.apache.flink.table.types.logical.DecimalType;
 
 import io.debezium.config.CommonConnectorConfig;
@@ -167,9 +166,9 @@ public class PostgresTypeUtils {
                 return DataTypes.ARRAY(
                         handleTimestampWithTemporalMode(temporalPrecisionMode, 
scale));
             case PgOid.TIMESTAMPTZ:
-                return new ZonedTimestampType(scale);
+                return DataTypes.TIMESTAMP_LTZ(scale);
             case PgOid.TIMESTAMPTZ_ARRAY:
-                return DataTypes.ARRAY(new ZonedTimestampType(scale));
+                return DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(scale));
             case PgOid.TIME:
                 return handleTimeWithTemporalMode(temporalPrecisionMode, 
scale);
             case PgOid.TIME_ARRAY:
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
index bcc70aff2..bb7bcd8c7 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
@@ -144,16 +144,16 @@ public class PostgresE2eITCase extends 
PipelineTestEnvironment {
                 "DataChangeEvent{tableId=%s.customers, before=[], after=[103, 
Edward, Walker, [email protected]], op=INSERT, meta=()}",
                 "DataChangeEvent{tableId=%s.customers, before=[], after=[102, 
George, Bailey, [email protected]], op=INSERT, meta=()}",
                 "DataChangeEvent{tableId=%s.customers, before=[], after=[101, 
Sally, Thomas, [email protected]], op=INSERT, meta=()}",
-                "CreateTableEvent{tableId=%s.products, schema=columns={`id` 
INT NOT NULL 'nextval('inventory.products_id_seq'::regclass)',`name` 
VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT}, 
primaryKeys=id, options=()}",
-                "DataChangeEvent{tableId=%s.products, before=[], after=[109, 
spare tire, 24 inch spare tire, 22.2], op=INSERT, meta=()}",
-                "DataChangeEvent{tableId=%s.products, before=[], after=[107, 
rocks, box of assorted rocks, 5.3], op=INSERT, meta=()}",
-                "DataChangeEvent{tableId=%s.products, before=[], after=[108, 
jacket, water resistent black wind breaker, 0.1], op=INSERT, meta=()}",
-                "DataChangeEvent{tableId=%s.products, before=[], after=[105, 
hammer, 14oz carpenter's hammer, 0.875], op=INSERT, meta=()}",
-                "DataChangeEvent{tableId=%s.products, before=[], after=[106, 
hammer, 16oz carpenter's hammer, 1.0], op=INSERT, meta=()}",
-                "DataChangeEvent{tableId=%s.products, before=[], after=[103, 
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 
0.8], op=INSERT, meta=()}",
-                "DataChangeEvent{tableId=%s.products, before=[], after=[104, 
hammer, 12oz carpenter's hammer, 0.75], op=INSERT, meta=()}",
-                "DataChangeEvent{tableId=%s.products, before=[], after=[101, 
scooter, Small 2-wheel scooter, 3.14], op=INSERT, meta=()}",
-                "DataChangeEvent{tableId=%s.products, before=[], after=[102, 
car battery, 12V car battery, 8.1], op=INSERT, meta=()}");
+                "CreateTableEvent{tableId=%s.products, schema=columns={`id` 
INT NOT NULL 'nextval('inventory.products_id_seq'::regclass)',`name` 
VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT,`created_at` 
TIMESTAMP_LTZ(6)}, primaryKeys=id, options=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[109, 
spare tire, 24 inch spare tire, 22.2, 2024-01-09T18:00], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[107, 
rocks, box of assorted rocks, 5.3, 2024-01-07T16:45], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[108, 
jacket, water resistent black wind breaker, 0.1, 2024-01-08T17:00], op=INSERT, 
meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[105, 
hammer, 14oz carpenter's hammer, 0.875, 2024-01-05T14:20], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[106, 
hammer, 16oz carpenter's hammer, 1.0, 2024-01-06T15:30], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[103, 
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 
0.8, 2024-01-03T12:00], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[104, 
hammer, 12oz carpenter's hammer, 0.75, 2024-01-04T13:15], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[101, 
scooter, Small 2-wheel scooter, 3.14, 2024-01-01T10:00], op=INSERT, meta=()}",
+                "DataChangeEvent{tableId=%s.products, before=[], after=[102, 
car battery, 12V car battery, 8.1, 2024-01-02T11:30], op=INSERT, meta=()}");
 
         LOG.info("Begin incremental reading stage.");
 
@@ -167,9 +167,9 @@ public class PostgresE2eITCase extends 
PipelineTestEnvironment {
 
             // Perform DML changes after the wal log is generated
             waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=inventory.products, before=[106, 
hammer, 16oz carpenter's hammer, 1.0], after=[106, hammer, 18oz carpenter 
hammer, 1.0], op=UPDATE, meta=()}");
+                    "DataChangeEvent{tableId=inventory.products, before=[106, 
hammer, 16oz carpenter's hammer, 1.0, 2024-01-06T15:30], after=[106, hammer, 
18oz carpenter hammer, 1.0, 2024-01-06T15:30], op=UPDATE, meta=()}");
             waitUntilSpecificEvent(
-                    "DataChangeEvent{tableId=inventory.products, before=[107, 
rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks, 
5.1], op=UPDATE, meta=()}");
+                    "DataChangeEvent{tableId=inventory.products, before=[107, 
rocks, box of assorted rocks, 5.3, 2024-01-07T16:45], after=[107, rocks, box of 
assorted rocks, 5.1, 2024-01-07T16:45], op=UPDATE, meta=()}");
         } catch (Exception e) {
             LOG.error("Update table for CDC failed.", e);
             throw new RuntimeException(e);
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/postgres_inventory.sql
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/postgres_inventory.sql
index fb23e6639..8c9c18840 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/postgres_inventory.sql
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/postgres_inventory.sql
@@ -23,21 +23,22 @@ CREATE TABLE products (
                           id SERIAL NOT NULL PRIMARY KEY,
                           name VARCHAR(255) NOT NULL,
                           description VARCHAR(512),
-                          weight FLOAT(24)
+                          weight FLOAT(24),
+                          created_at TIMESTAMPTZ
 );
 ALTER SEQUENCE products_id_seq RESTART WITH 101;
 ALTER TABLE products REPLICA IDENTITY FULL;
 
 INSERT INTO products
-VALUES (default,'scooter','Small 2-wheel scooter',3.14),
-       (default,'car battery','12V car battery',8.1),
-       (default,'12-pack drill bits','12-pack of drill bits with sizes ranging 
from #40 to #3',0.8),
-       (default,'hammer','12oz carpenter''s hammer',0.75),
-       (default,'hammer','14oz carpenter''s hammer',0.875),
-       (default,'hammer','16oz carpenter''s hammer',1.0),
-       (default,'rocks','box of assorted rocks',5.3),
-       (default,'jacket','water resistent black wind breaker',0.1),
-       (default,'spare tire','24 inch spare tire',22.2);
+VALUES (default,'scooter','Small 2-wheel scooter',3.14,'2024-01-01 
10:00:00+00'),
+       (default,'car battery','12V car battery',8.1,'2024-01-02 11:30:00+00'),
+       (default,'12-pack drill bits','12-pack of drill bits with sizes ranging 
from #40 to #3',0.8,'2024-01-03 12:00:00+00'),
+       (default,'hammer','12oz carpenter''s hammer',0.75,'2024-01-04 
13:15:00+00'),
+       (default,'hammer','14oz carpenter''s hammer',0.875,'2024-01-05 
14:20:00+00'),
+       (default,'hammer','16oz carpenter''s hammer',1.0,'2024-01-06 
15:30:00+00'),
+       (default,'rocks','box of assorted rocks',5.3,'2024-01-07 16:45:00+00'),
+       (default,'jacket','water resistent black wind breaker',0.1,'2024-01-08 
17:00:00+00'),
+       (default,'spare tire','24 inch spare tire',22.2,'2024-01-09 
18:00:00+00');
 
 -- Create customers table
 CREATE TABLE customers (

Reply via email to