This is an automated email from the ASF dual-hosted git repository.
loserwang1024 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 26e6b73bd [FLINK-38742][cdc-pipeline/postgres] Fix TIMESTAMPTZ type
mapping to TIMESTAMP_LTZ (#4181)
26e6b73bd is described below
commit 26e6b73bd4a4ab36d198e0d8e1beb55f843a0f0c
Author: A S Rakesh Krishna <[email protected]>
AuthorDate: Tue May 26 16:51:58 2026 +0530
[FLINK-38742][cdc-pipeline/postgres] Fix TIMESTAMPTZ type mapping to
TIMESTAMP_LTZ (#4181)
---
.../postgres/utils/PostgresTypeUtils.java | 5 ++---
.../cdc/pipeline/tests/PostgresE2eITCase.java | 24 +++++++++++-----------
.../src/test/resources/ddl/postgres_inventory.sql | 21 ++++++++++---------
3 files changed, 25 insertions(+), 25 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
index 9362249c4..fcf017541 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/utils/PostgresTypeUtils.java
@@ -19,7 +19,6 @@ package org.apache.flink.cdc.connectors.postgres.utils;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
-import org.apache.flink.cdc.common.types.ZonedTimestampType;
import org.apache.flink.table.types.logical.DecimalType;
import io.debezium.config.CommonConnectorConfig;
@@ -167,9 +166,9 @@ public class PostgresTypeUtils {
return DataTypes.ARRAY(
handleTimestampWithTemporalMode(temporalPrecisionMode,
scale));
case PgOid.TIMESTAMPTZ:
- return new ZonedTimestampType(scale);
+ return DataTypes.TIMESTAMP_LTZ(scale);
case PgOid.TIMESTAMPTZ_ARRAY:
- return DataTypes.ARRAY(new ZonedTimestampType(scale));
+ return DataTypes.ARRAY(DataTypes.TIMESTAMP_LTZ(scale));
case PgOid.TIME:
return handleTimeWithTemporalMode(temporalPrecisionMode,
scale);
case PgOid.TIME_ARRAY:
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
index bcc70aff2..bb7bcd8c7 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java
@@ -144,16 +144,16 @@ public class PostgresE2eITCase extends
PipelineTestEnvironment {
"DataChangeEvent{tableId=%s.customers, before=[], after=[103,
Edward, Walker, [email protected]], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.customers, before=[], after=[102,
George, Bailey, [email protected]], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.customers, before=[], after=[101,
Sally, Thomas, [email protected]], op=INSERT, meta=()}",
- "CreateTableEvent{tableId=%s.products, schema=columns={`id`
INT NOT NULL 'nextval('inventory.products_id_seq'::regclass)',`name`
VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT},
primaryKeys=id, options=()}",
- "DataChangeEvent{tableId=%s.products, before=[], after=[109,
spare tire, 24 inch spare tire, 22.2], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.products, before=[], after=[107,
rocks, box of assorted rocks, 5.3], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.products, before=[], after=[108,
jacket, water resistent black wind breaker, 0.1], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.products, before=[], after=[105,
hammer, 14oz carpenter's hammer, 0.875], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.products, before=[], after=[106,
hammer, 16oz carpenter's hammer, 1.0], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.products, before=[], after=[103,
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3,
0.8], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.products, before=[], after=[104,
hammer, 12oz carpenter's hammer, 0.75], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.products, before=[], after=[101,
scooter, Small 2-wheel scooter, 3.14], op=INSERT, meta=()}",
- "DataChangeEvent{tableId=%s.products, before=[], after=[102,
car battery, 12V car battery, 8.1], op=INSERT, meta=()}");
+ "CreateTableEvent{tableId=%s.products, schema=columns={`id`
INT NOT NULL 'nextval('inventory.products_id_seq'::regclass)',`name`
VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT,`created_at`
TIMESTAMP_LTZ(6)}, primaryKeys=id, options=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[109,
spare tire, 24 inch spare tire, 22.2, 2024-01-09T18:00], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[107,
rocks, box of assorted rocks, 5.3, 2024-01-07T16:45], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[108,
jacket, water resistent black wind breaker, 0.1, 2024-01-08T17:00], op=INSERT,
meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[105,
hammer, 14oz carpenter's hammer, 0.875, 2024-01-05T14:20], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[106,
hammer, 16oz carpenter's hammer, 1.0, 2024-01-06T15:30], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[103,
12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3,
0.8, 2024-01-03T12:00], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[104,
hammer, 12oz carpenter's hammer, 0.75, 2024-01-04T13:15], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[101,
scooter, Small 2-wheel scooter, 3.14, 2024-01-01T10:00], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[102,
car battery, 12V car battery, 8.1, 2024-01-02T11:30], op=INSERT, meta=()}");
LOG.info("Begin incremental reading stage.");
@@ -167,9 +167,9 @@ public class PostgresE2eITCase extends
PipelineTestEnvironment {
// Perform DML changes after the wal log is generated
waitUntilSpecificEvent(
- "DataChangeEvent{tableId=inventory.products, before=[106,
hammer, 16oz carpenter's hammer, 1.0], after=[106, hammer, 18oz carpenter
hammer, 1.0], op=UPDATE, meta=()}");
+ "DataChangeEvent{tableId=inventory.products, before=[106,
hammer, 16oz carpenter's hammer, 1.0, 2024-01-06T15:30], after=[106, hammer,
18oz carpenter hammer, 1.0, 2024-01-06T15:30], op=UPDATE, meta=()}");
waitUntilSpecificEvent(
- "DataChangeEvent{tableId=inventory.products, before=[107,
rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks,
5.1], op=UPDATE, meta=()}");
+ "DataChangeEvent{tableId=inventory.products, before=[107,
rocks, box of assorted rocks, 5.3, 2024-01-07T16:45], after=[107, rocks, box of
assorted rocks, 5.1, 2024-01-07T16:45], op=UPDATE, meta=()}");
} catch (Exception e) {
LOG.error("Update table for CDC failed.", e);
throw new RuntimeException(e);
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/postgres_inventory.sql
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/postgres_inventory.sql
index fb23e6639..8c9c18840 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/postgres_inventory.sql
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/ddl/postgres_inventory.sql
@@ -23,21 +23,22 @@ CREATE TABLE products (
id SERIAL NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
- weight FLOAT(24)
+ weight FLOAT(24),
+ created_at TIMESTAMPTZ
);
ALTER SEQUENCE products_id_seq RESTART WITH 101;
ALTER TABLE products REPLICA IDENTITY FULL;
INSERT INTO products
-VALUES (default,'scooter','Small 2-wheel scooter',3.14),
- (default,'car battery','12V car battery',8.1),
- (default,'12-pack drill bits','12-pack of drill bits with sizes ranging
from #40 to #3',0.8),
- (default,'hammer','12oz carpenter''s hammer',0.75),
- (default,'hammer','14oz carpenter''s hammer',0.875),
- (default,'hammer','16oz carpenter''s hammer',1.0),
- (default,'rocks','box of assorted rocks',5.3),
- (default,'jacket','water resistent black wind breaker',0.1),
- (default,'spare tire','24 inch spare tire',22.2);
+VALUES (default,'scooter','Small 2-wheel scooter',3.14,'2024-01-01
10:00:00+00'),
+ (default,'car battery','12V car battery',8.1,'2024-01-02 11:30:00+00'),
+ (default,'12-pack drill bits','12-pack of drill bits with sizes ranging
from #40 to #3',0.8,'2024-01-03 12:00:00+00'),
+ (default,'hammer','12oz carpenter''s hammer',0.75,'2024-01-04
13:15:00+00'),
+ (default,'hammer','14oz carpenter''s hammer',0.875,'2024-01-05
14:20:00+00'),
+ (default,'hammer','16oz carpenter''s hammer',1.0,'2024-01-06
15:30:00+00'),
+ (default,'rocks','box of assorted rocks',5.3,'2024-01-07 16:45:00+00'),
+ (default,'jacket','water resistent black wind breaker',0.1,'2024-01-08
17:00:00+00'),
+ (default,'spare tire','24 inch spare tire',22.2,'2024-01-09
18:00:00+00');
-- Create customers table
CREATE TABLE customers (