This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 58ab917024 [Fix][Connectors-Jdbc] Postgres supports streaming and 
batch reading and writing of the `interval` data type (#9590)
58ab917024 is described below

commit 58ab917024de2196e300495c971cecdaa5b889c2
Author: chestnufang <[email protected]>
AuthorDate: Tue Jul 22 19:53:31 2025 +0800

    [Fix][Connectors-Jdbc] Postgres supports streaming and batch reading and 
writing of the `interval` data type (#9590)
    
    Co-authored-by: chestnufang <[email protected]>
    Co-authored-by: corgy-w <[email protected]>
---
 .../dialect/psql/PostgresJdbcRowConverter.java     | 33 ++++++++++++
 .../dialect/psql/PostgresTypeConverter.java        |  3 ++
 .../dialect/psql/PostgresTypeConverterTest.java    | 15 ++++++
 .../seatunnel/cdc/postgres/PostgresCDCIT.java      | 33 ++++++++++++
 .../src/test/resources/ddl/inventory.sql           | 24 +++++++++
 ...rescdc_to_postgres_with_interval_data_type.conf | 60 ++++++++++++++++++++++
 6 files changed, 168 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index 7fbb2f7782..b8c1d21aa9 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -31,6 +31,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.Abstrac
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
 
+import org.apache.commons.lang3.math.NumberUtils;
+
 import org.postgresql.util.PGobject;
 
 import javax.annotation.Nullable;
@@ -43,6 +45,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Time;
 import java.sql.Timestamp;
+import java.time.Duration;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
@@ -50,6 +53,7 @@ import java.util.Locale;
 import java.util.Optional;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INET;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INTERVAL;
 
 public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter {
 
@@ -189,6 +193,18 @@ public class PostgresJdbcRowConverter extends 
AbstractJdbcRowConverter {
                             inetObject.setType(PG_INET);
                             
inetObject.setValue(String.valueOf(row.getField(fieldIndex)));
                             statement.setObject(statementIndex, inetObject);
+                        } else if (PG_INTERVAL.equalsIgnoreCase(sourceType)) {
+                            PGobject intervalObject = new PGobject();
+                            intervalObject.setType(PG_INTERVAL);
+                            String intervalVal = 
String.valueOf(row.getField(fieldIndex));
+                            if (NumberUtils.isCreatable(intervalVal)) {
+                                // postgres interval types are converted to 
microseconds (long) in
+                                // Debezium, so if it is a number,
+                                // it is formatted as a postgres interval 
value.
+                                intervalVal = 
microsecondsToIntervalFormatVal(intervalVal);
+                            }
+                            intervalObject.setValue(intervalVal);
+                            statement.setObject(statementIndex, 
intervalObject);
                         } else {
                             statement.setString(statementIndex, (String) 
row.getField(fieldIndex));
                         }
@@ -270,4 +286,21 @@ public class PostgresJdbcRowConverter extends 
AbstractJdbcRowConverter {
         }
         return statement;
     }
+
+    public String microsecondsToIntervalFormatVal(String intervalVal) {
+        Duration duration = Duration.ofNanos(Long.parseLong(intervalVal) * 
1000);
+        int days = (int) duration.toDays();
+        duration = duration.minusDays(days);
+        int hours = (int) duration.toHours();
+        duration = duration.minusHours(hours);
+        int minutes = (int) duration.toMinutes();
+        duration = duration.minusMinutes(minutes);
+        int seconds = (int) duration.getSeconds();
+        StringBuilder sb = new StringBuilder();
+        if (days > 0) sb.append(days).append(" days ");
+        if (hours > 0) sb.append(hours).append(" hours ");
+        if (minutes > 0) sb.append(minutes).append(" minutes ");
+        if (seconds > 0) sb.append(seconds).append(" seconds");
+        return sb.toString().trim();
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
index af2e55a4b4..3abb5126a3 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
@@ -95,6 +95,8 @@ public class PostgresTypeConverter implements 
TypeConverter<BasicTypeDefine> {
     private static final String PG_GEOMETRY = "geometry";
     private static final String PG_GEOGRAPHY = "geography";
     public static final String PG_DATE = "date";
+    public static final String PG_INTERVAL = "interval";
+
     // time without time zone <=> time
     public static final String PG_TIME = "time";
     // time with time zone <=> timetz
@@ -225,6 +227,7 @@ public class PostgresTypeConverter implements 
TypeConverter<BasicTypeDefine> {
             case PG_GEOMETRY:
             case PG_GEOGRAPHY:
             case PG_INET:
+            case PG_INTERVAL:
                 builder.dataType(BasicType.STRING_TYPE);
                 builder.sourceType(pgDataType);
                 break;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
index 1054bb33db..28c0b54a56 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
@@ -812,4 +812,19 @@ public class PostgresTypeConverterTest {
                 PostgresTypeConverter.PG_SMALLINT_ARRAY, 
typeDefine.getColumnType());
         Assertions.assertEquals(PostgresTypeConverter.PG_SMALLINT_ARRAY, 
typeDefine.getDataType());
     }
+
+    @Test
+    public void testConvertInterval() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("test")
+                        .columnType("interval")
+                        .dataType("interval")
+                        .build();
+        Column column = PostgresTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+        Assertions.assertEquals(null, column.getColumnLength());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index 6be7bd9377..ed290bbbdd 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -111,9 +111,11 @@ public class PostgresCDCIT extends TestSuiteBase 
implements TestResource {
     private static final String SOURCE_TABLE_1 = "postgres_cdc_table_1";
     private static final String SOURCE_TABLE_2 = "postgres_cdc_table_2";
     private static final String SOURCE_TABLE_3 = "postgres_cdc_table_3";
+    private static final String SOURCE_TABLE_4 = "postgres_cdc_table_4";
     private static final String SINK_TABLE_1 = "sink_postgres_cdc_table_1";
     private static final String SINK_TABLE_2 = "sink_postgres_cdc_table_2";
     private static final String SINK_TABLE_3 = "sink_postgres_cdc_table_3";
+    private static final String SINK_TABLE_4 = "sink_postgres_cdc_table_4";
 
     private static final String SOURCE_TABLE_NO_PRIMARY_KEY = 
"full_types_no_primary_key";
 
@@ -771,6 +773,37 @@ public class PostgresCDCIT extends TestSuiteBase 
implements TestResource {
         }
     }
 
+    @TestTemplate
+    public void testPostgresCdcCheckDataWithIntervalDataType(TestContainer 
container)
+            throws Exception {
+
+        try {
+            CompletableFuture.supplyAsync(
+                    () -> {
+                        try {
+                            container.executeJob(
+                                    
"/postgrescdc_to_postgres_with_interval_data_type.conf");
+                        } catch (Exception e) {
+                            log.error("Commit task exception :" + 
e.getMessage());
+                            throw new RuntimeException(e);
+                        }
+                        return null;
+                    });
+
+            // stream stage
+            await().atMost(60000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () -> {
+                                Assertions.assertIterableEquals(
+                                        query(getQuerySQL(POSTGRESQL_SCHEMA, 
SOURCE_TABLE_4)),
+                                        query(getQuerySQL(POSTGRESQL_SCHEMA, 
SINK_TABLE_4)));
+                            });
+        } finally {
+            clearTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_4);
+            clearTable(POSTGRESQL_SCHEMA, SINK_TABLE_4);
+        }
+    }
+
     @Test
     public void testDialectCheckDisabledCDCTable() throws SQLException {
         JdbcSourceConfigFactory factory =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
index 59875092ef..095a593cb1 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
@@ -198,6 +198,24 @@ CREATE TABLE sink_postgres_cdc_table_3
     PRIMARY KEY (id)
 );
 
+CREATE TABLE postgres_cdc_table_4
+(
+    id                  INTEGER NOT NULL,
+    f_bytea             BYTEA,
+    f_small             SMALLINT,
+    f_interval          INTERVAL,
+    PRIMARY KEY (id)
+);
+
+CREATE TABLE sink_postgres_cdc_table_4
+(
+    id                  INTEGER NOT NULL,
+    f_bytea             BYTEA,
+    f_small             SMALLINT,
+    f_interval          INTERVAL,
+    PRIMARY KEY (id)
+);
+
 ALTER TABLE postgres_cdc_table_1
     REPLICA IDENTITY FULL;
 
@@ -207,6 +225,9 @@ ALTER TABLE postgres_cdc_table_2
 ALTER TABLE postgres_cdc_table_3
     REPLICA IDENTITY FULL;
 
+ALTER TABLE postgres_cdc_table_4
+    REPLICA IDENTITY FULL;
+
 ALTER TABLE sink_postgres_cdc_table_1
     REPLICA IDENTITY FULL;
 
@@ -232,6 +253,9 @@ VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 
123.12345, 404.4443, true,
 INSERT INTO postgres_cdc_table_3
 VALUES (1, '2', 32767, 65535);
 
+INSERT INTO postgres_cdc_table_4
+VALUES (1, '2', 32767, INTERVAL '2 days 3 hours');
+
 INSERT INTO full_types_no_primary_key
 VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
         'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', 
'2020-07-17 18:00:22.123456',
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_interval_data_type.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_interval_data_type.conf
new file mode 100644
index 0000000000..64cd3de34a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_interval_data_type.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  Postgres-CDC {
+    plugin_output = "customers_postgres_cdc"
+    username = "postgres"
+    password = "postgres"
+    database-names = ["postgres_cdc"]
+    schema-names = ["inventory"]
+    table-names = ["postgres_cdc.inventory.postgres_cdc_table_4"]
+    base-url = 
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+  }
+}
+
+transform {
+
+}
+
+sink {
+  jdbc {
+    plugin_input = "customers_postgres_cdc"
+    url = 
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+    driver = "org.postgresql.Driver"
+    user = "postgres"
+    password = "postgres"
+
+    generate_sink_sql = true
+    # You need to configure both database and table
+    database = postgres_cdc
+    table = inventory.sink_postgres_cdc_table_4
+    primary_keys = ["id"]
+  }
+}

Reply via email to