This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 58ab917024 [Fix][Connectors-Jdbc] Postgres supports streaming and
batch reading and writing of the `interval` data type (#9590)
58ab917024 is described below
commit 58ab917024de2196e300495c971cecdaa5b889c2
Author: chestnufang <[email protected]>
AuthorDate: Tue Jul 22 19:53:31 2025 +0800
[Fix][Connectors-Jdbc] Postgres supports streaming and batch reading and
writing of the `interval` data type (#9590)
Co-authored-by: chestnufang <[email protected]>
Co-authored-by: corgy-w <[email protected]>
---
.../dialect/psql/PostgresJdbcRowConverter.java | 33 ++++++++++++
.../dialect/psql/PostgresTypeConverter.java | 3 ++
.../dialect/psql/PostgresTypeConverterTest.java | 15 ++++++
.../seatunnel/cdc/postgres/PostgresCDCIT.java | 33 ++++++++++++
.../src/test/resources/ddl/inventory.sql | 24 +++++++++
...rescdc_to_postgres_with_interval_data_type.conf | 60 ++++++++++++++++++++++
6 files changed, 168 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index 7fbb2f7782..b8c1d21aa9 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -31,6 +31,8 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.Abstrac
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.JdbcFieldTypeUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+
import org.postgresql.util.PGobject;
import javax.annotation.Nullable;
@@ -43,6 +45,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
+import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
@@ -50,6 +53,7 @@ import java.util.Locale;
import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INET;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INTERVAL;
public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter {
@@ -189,6 +193,18 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
inetObject.setType(PG_INET);
inetObject.setValue(String.valueOf(row.getField(fieldIndex)));
statement.setObject(statementIndex, inetObject);
+ } else if (PG_INTERVAL.equalsIgnoreCase(sourceType)) {
+ PGobject intervalObject = new PGobject();
+ intervalObject.setType(PG_INTERVAL);
+ String intervalVal =
String.valueOf(row.getField(fieldIndex));
+ if (NumberUtils.isCreatable(intervalVal)) {
+ // postgres interval types are converted to
microseconds (long) in
+ // Debezium, so if it is a number,
+ // it is formatted as a postgres interval
value.
+ intervalVal =
microsecondsToIntervalFormatVal(intervalVal);
+ }
+ intervalObject.setValue(intervalVal);
+ statement.setObject(statementIndex,
intervalObject);
} else {
statement.setString(statementIndex, (String)
row.getField(fieldIndex));
}
@@ -270,4 +286,21 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
}
return statement;
}
+
+ public String microsecondsToIntervalFormatVal(String intervalVal) {
+ Duration duration = Duration.ofNanos(Long.parseLong(intervalVal) *
1000);
+ int days = (int) duration.toDays();
+ duration = duration.minusDays(days);
+ int hours = (int) duration.toHours();
+ duration = duration.minusHours(hours);
+ int minutes = (int) duration.toMinutes();
+ duration = duration.minusMinutes(minutes);
+ int seconds = (int) duration.getSeconds();
+ StringBuilder sb = new StringBuilder();
+ if (days > 0) sb.append(days).append(" days ");
+ if (hours > 0) sb.append(hours).append(" hours ");
+ if (minutes > 0) sb.append(minutes).append(" minutes ");
+ if (seconds > 0) sb.append(seconds).append(" seconds");
+ return sb.toString().trim();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
index af2e55a4b4..3abb5126a3 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
@@ -95,6 +95,8 @@ public class PostgresTypeConverter implements
TypeConverter<BasicTypeDefine> {
private static final String PG_GEOMETRY = "geometry";
private static final String PG_GEOGRAPHY = "geography";
public static final String PG_DATE = "date";
+ public static final String PG_INTERVAL = "interval";
+
// time without time zone <=> time
public static final String PG_TIME = "time";
// time with time zone <=> timetz
@@ -225,6 +227,7 @@ public class PostgresTypeConverter implements
TypeConverter<BasicTypeDefine> {
case PG_GEOMETRY:
case PG_GEOGRAPHY:
case PG_INET:
+ case PG_INTERVAL:
builder.dataType(BasicType.STRING_TYPE);
builder.sourceType(pgDataType);
break;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
index 1054bb33db..28c0b54a56 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
@@ -812,4 +812,19 @@ public class PostgresTypeConverterTest {
PostgresTypeConverter.PG_SMALLINT_ARRAY,
typeDefine.getColumnType());
Assertions.assertEquals(PostgresTypeConverter.PG_SMALLINT_ARRAY,
typeDefine.getDataType());
}
+
+ @Test
+ public void testConvertInterval() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("test")
+ .columnType("interval")
+ .dataType("interval")
+ .build();
+ Column column = PostgresTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+ Assertions.assertEquals(null, column.getColumnLength());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index 6be7bd9377..ed290bbbdd 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -111,9 +111,11 @@ public class PostgresCDCIT extends TestSuiteBase
implements TestResource {
private static final String SOURCE_TABLE_1 = "postgres_cdc_table_1";
private static final String SOURCE_TABLE_2 = "postgres_cdc_table_2";
private static final String SOURCE_TABLE_3 = "postgres_cdc_table_3";
+ private static final String SOURCE_TABLE_4 = "postgres_cdc_table_4";
private static final String SINK_TABLE_1 = "sink_postgres_cdc_table_1";
private static final String SINK_TABLE_2 = "sink_postgres_cdc_table_2";
private static final String SINK_TABLE_3 = "sink_postgres_cdc_table_3";
+ private static final String SINK_TABLE_4 = "sink_postgres_cdc_table_4";
private static final String SOURCE_TABLE_NO_PRIMARY_KEY =
"full_types_no_primary_key";
@@ -771,6 +773,37 @@ public class PostgresCDCIT extends TestSuiteBase
implements TestResource {
}
}
+ @TestTemplate
+ public void testPostgresCdcCheckDataWithIntervalDataType(TestContainer
container)
+ throws Exception {
+
+ try {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob(
+
"/postgrescdc_to_postgres_with_interval_data_type.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" +
e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(getQuerySQL(POSTGRESQL_SCHEMA,
SOURCE_TABLE_4)),
+ query(getQuerySQL(POSTGRESQL_SCHEMA,
SINK_TABLE_4)));
+ });
+ } finally {
+ clearTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_4);
+ clearTable(POSTGRESQL_SCHEMA, SINK_TABLE_4);
+ }
+ }
+
@Test
public void testDialectCheckDisabledCDCTable() throws SQLException {
JdbcSourceConfigFactory factory =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
index 59875092ef..095a593cb1 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
@@ -198,6 +198,24 @@ CREATE TABLE sink_postgres_cdc_table_3
PRIMARY KEY (id)
);
+CREATE TABLE postgres_cdc_table_4
+(
+ id INTEGER NOT NULL,
+ f_bytea BYTEA,
+ f_small SMALLINT,
+ f_interval INTERVAL,
+ PRIMARY KEY (id)
+);
+
+CREATE TABLE sink_postgres_cdc_table_4
+(
+ id INTEGER NOT NULL,
+ f_bytea BYTEA,
+ f_small SMALLINT,
+ f_interval INTERVAL,
+ PRIMARY KEY (id)
+);
+
ALTER TABLE postgres_cdc_table_1
REPLICA IDENTITY FULL;
@@ -207,6 +225,9 @@ ALTER TABLE postgres_cdc_table_2
ALTER TABLE postgres_cdc_table_3
REPLICA IDENTITY FULL;
+ALTER TABLE postgres_cdc_table_4
+ REPLICA IDENTITY FULL;
+
ALTER TABLE sink_postgres_cdc_table_1
REPLICA IDENTITY FULL;
@@ -232,6 +253,9 @@ VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6,
123.12345, 404.4443, true,
INSERT INTO postgres_cdc_table_3
VALUES (1, '2', 32767, 65535);
+INSERT INTO postgres_cdc_table_4
+VALUES (1, '2', 32767, INTERVAL '2 days 3 hours');
+
INSERT INTO full_types_no_primary_key
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_interval_data_type.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_interval_data_type.conf
new file mode 100644
index 0000000000..64cd3de34a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_interval_data_type.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ Postgres-CDC {
+ plugin_output = "customers_postgres_cdc"
+ username = "postgres"
+ password = "postgres"
+ database-names = ["postgres_cdc"]
+ schema-names = ["inventory"]
+ table-names = ["postgres_cdc.inventory.postgres_cdc_table_4"]
+ base-url =
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+ }
+}
+
+transform {
+
+}
+
+sink {
+ jdbc {
+ plugin_input = "customers_postgres_cdc"
+ url =
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+ driver = "org.postgresql.Driver"
+ user = "postgres"
+ password = "postgres"
+
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = postgres_cdc
+ table = inventory.sink_postgres_cdc_table_4
+ primary_keys = ["id"]
+ }
+}