This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3dc79c1ddf [Fix][Connector-Jdbc] Supports reading and writing Postgres
network dress types (#9618)
3dc79c1ddf is described below
commit 3dc79c1ddfbaec93120bfbaf1189aaa49363bd41
Author: chestnufang <[email protected]>
AuthorDate: Mon Jul 28 10:09:10 2025 +0800
[Fix][Connector-Jdbc] Supports reading and writing Postgres network dress
types (#9618)
---
.../dialect/psql/PostgresJdbcRowConverter.java | 17 ++++--
.../dialect/psql/PostgresTypeConverter.java | 6 +++
.../dialect/psql/PostgresTypeConverterTest.java | 35 +++++++++++++
.../seatunnel/cdc/postgres/PostgresCDCIT.java | 31 +++++++++++
.../src/test/resources/ddl/inventory.sql | 32 ++++++++++++
...cdc_to_postgres_with_network_address_types.conf | 60 ++++++++++++++++++++++
6 files changed, 176 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index b8c1d21aa9..6866e428e4 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -52,8 +52,11 @@ import java.time.LocalTime;
import java.util.Locale;
import java.util.Optional;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_CIDR;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INET;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INTERVAL;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_MAC_ADDR;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_MAC_ADDR8;
public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter {
@@ -188,11 +191,15 @@ public class PostgresJdbcRowConverter extends
AbstractJdbcRowConverter {
switch (seaTunnelDataType.getSqlType()) {
case STRING:
String sourceType = sourceTypes[fieldIndex];
- if (PG_INET.equalsIgnoreCase(sourceType)) {
- PGobject inetObject = new PGobject();
- inetObject.setType(PG_INET);
-
inetObject.setValue(String.valueOf(row.getField(fieldIndex)));
- statement.setObject(statementIndex, inetObject);
+ if (PG_INET.equalsIgnoreCase(sourceType)
+ || PG_CIDR.equalsIgnoreCase(sourceType)
+ || PG_MAC_ADDR.equalsIgnoreCase(sourceType)
+ || PG_MAC_ADDR8.equalsIgnoreCase(sourceType)) {
+ // handle network address types of postgres
+ PGobject networkTypeObject = new PGobject();
+ networkTypeObject.setType(sourceType);
+
networkTypeObject.setValue(String.valueOf(row.getField(fieldIndex)));
+ statement.setObject(statementIndex,
networkTypeObject);
} else if (PG_INTERVAL.equalsIgnoreCase(sourceType)) {
PGobject intervalObject = new PGobject();
intervalObject.setType(PG_INTERVAL);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
index 3abb5126a3..f472d3bce5 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
@@ -83,6 +83,9 @@ public class PostgresTypeConverter implements
TypeConverter<BasicTypeDefine> {
// character varying <=> varchar
public static final String PG_VARCHAR = "varchar";
public static final String PG_INET = "inet";
+ public static final String PG_CIDR = "cidr";
+ public static final String PG_MAC_ADDR = "macaddr";
+ public static final String PG_MAC_ADDR8 = "macaddr8";
public static final String PG_CHARACTER_VARYING = "character varying";
// character varying[] <=> varchar[] <=> _varchar
public static final String PG_VARCHAR_ARRAY = "_varchar";
@@ -228,6 +231,9 @@ public class PostgresTypeConverter implements
TypeConverter<BasicTypeDefine> {
case PG_GEOGRAPHY:
case PG_INET:
case PG_INTERVAL:
+ case PG_CIDR:
+ case PG_MAC_ADDR:
+ case PG_MAC_ADDR8:
builder.dataType(BasicType.STRING_TYPE);
builder.sourceType(pgDataType);
break;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
index 28c0b54a56..ce48ddab64 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
@@ -827,4 +827,39 @@ public class PostgresTypeConverterTest {
Assertions.assertEquals(null, column.getColumnLength());
Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
}
+
+ @Test
+ public void testConvertNetworkAddressTypes() {
+ BasicTypeDefine<Object> typeDefine =
+
BasicTypeDefine.builder().name("test").columnType("cidr").dataType("cidr").build();
+ Column column = PostgresTypeConverter.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals(typeDefine.getName(), column.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+ Assertions.assertNull(column.getColumnLength());
+ Assertions.assertEquals(typeDefine.getColumnType(),
column.getSourceType());
+
+ BasicTypeDefine<Object> typeDefine1 =
+ BasicTypeDefine.builder()
+ .name("test1")
+ .columnType("macaddr")
+ .dataType("macaddr")
+ .build();
+ Column column1 = PostgresTypeConverter.INSTANCE.convert(typeDefine1);
+ Assertions.assertEquals(typeDefine1.getName(), column1.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column1.getDataType());
+ Assertions.assertNull(column1.getColumnLength());
+ Assertions.assertEquals(typeDefine1.getColumnType(),
column1.getSourceType());
+
+ BasicTypeDefine<Object> typeDefine2 =
+ BasicTypeDefine.builder()
+ .name("test2")
+ .columnType("macaddr8")
+ .dataType("macaddr8")
+ .build();
+ Column column2 = PostgresTypeConverter.INSTANCE.convert(typeDefine2);
+ Assertions.assertEquals(typeDefine2.getName(), column2.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE, column2.getDataType());
+ Assertions.assertNull(column2.getColumnLength());
+ Assertions.assertEquals(typeDefine2.getColumnType(),
column2.getSourceType());
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index ed290bbbdd..d6c42de2af 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -112,10 +112,12 @@ public class PostgresCDCIT extends TestSuiteBase
implements TestResource {
private static final String SOURCE_TABLE_2 = "postgres_cdc_table_2";
private static final String SOURCE_TABLE_3 = "postgres_cdc_table_3";
private static final String SOURCE_TABLE_4 = "postgres_cdc_table_4";
+ private static final String SOURCE_TABLE_5 = "postgres_cdc_table_5";
private static final String SINK_TABLE_1 = "sink_postgres_cdc_table_1";
private static final String SINK_TABLE_2 = "sink_postgres_cdc_table_2";
private static final String SINK_TABLE_3 = "sink_postgres_cdc_table_3";
private static final String SINK_TABLE_4 = "sink_postgres_cdc_table_4";
+ private static final String SINK_TABLE_5 = "sink_postgres_cdc_table_5";
private static final String SOURCE_TABLE_NO_PRIMARY_KEY =
"full_types_no_primary_key";
@@ -804,6 +806,35 @@ public class PostgresCDCIT extends TestSuiteBase
implements TestResource {
}
}
+ @TestTemplate
+ public void testPostgresCdcCheckDataWithNetworkAddressTypes(TestContainer
container) {
+ try {
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob(
+
"/postgrescdc_to_postgres_with_network_address_types.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" +
e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ // stream stage
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertIterableEquals(
+ query(getQuerySQL(POSTGRESQL_SCHEMA,
SOURCE_TABLE_5)),
+ query(getQuerySQL(POSTGRESQL_SCHEMA,
SINK_TABLE_5)));
+ });
+ } finally {
+ clearTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_5);
+ clearTable(POSTGRESQL_SCHEMA, SINK_TABLE_5);
+ }
+ }
+
@Test
public void testDialectCheckDisabledCDCTable() throws SQLException {
JdbcSourceConfigFactory factory =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
index 095a593cb1..c823ac5a3d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
@@ -216,6 +216,32 @@ CREATE TABLE sink_postgres_cdc_table_4
PRIMARY KEY (id)
);
+CREATE TABLE postgres_cdc_table_5
+(
+ id INTEGER NOT NULL,
+ f_bytea BYTEA,
+ f_small SMALLINT,
+ f_interval INTERVAL,
+ ip INET,
+ network CIDR,
+ mac MACADDR,
+ mac8 MACADDR8,
+ PRIMARY KEY (id)
+);
+
+CREATE TABLE sink_postgres_cdc_table_5
+(
+ id INTEGER NOT NULL,
+ f_bytea BYTEA,
+ f_small SMALLINT,
+ f_interval INTERVAL,
+ ip INET,
+ network CIDR,
+ mac MACADDR,
+ mac8 MACADDR8,
+ PRIMARY KEY (id)
+);
+
ALTER TABLE postgres_cdc_table_1
REPLICA IDENTITY FULL;
@@ -228,6 +254,9 @@ ALTER TABLE postgres_cdc_table_3
ALTER TABLE postgres_cdc_table_4
REPLICA IDENTITY FULL;
+ALTER TABLE postgres_cdc_table_5
+ REPLICA IDENTITY FULL;
+
ALTER TABLE sink_postgres_cdc_table_1
REPLICA IDENTITY FULL;
@@ -256,6 +285,9 @@ VALUES (1, '2', 32767, 65535);
INSERT INTO postgres_cdc_table_4
VALUES (1, '2', 32767, INTERVAL '2 days 3 hours');
+INSERT INTO postgres_cdc_table_5 (id, f_bytea, f_small, f_interval, ip,
network, mac, mac8)
+VALUES (1, '2', 32767, INTERVAL '1 day 2 hours', '192.168.1.100',
'192.168.1.0/24', '08:00:2b:01:02:03', '08:00:2b:01:02:03:04:05');
+
INSERT INTO full_types_no_primary_key
VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123',
'2020-07-17 18:00:22.123456',
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_network_address_types.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_network_address_types.conf
new file mode 100644
index 0000000000..6c096bca14
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_network_address_types.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ read_limit.bytes_per_second=7000000
+ read_limit.rows_per_second=400
+}
+
+source {
+ Postgres-CDC {
+ plugin_output = "customers_postgres_cdc"
+ username = "postgres"
+ password = "postgres"
+ database-names = ["postgres_cdc"]
+ schema-names = ["inventory"]
+ table-names = ["postgres_cdc.inventory.postgres_cdc_table_5"]
+ base-url =
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+ }
+}
+
+transform {
+
+}
+
+sink {
+ jdbc {
+ plugin_input = "customers_postgres_cdc"
+ url =
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+ driver = "org.postgresql.Driver"
+ user = "postgres"
+ password = "postgres"
+
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = postgres_cdc
+ table = inventory.sink_postgres_cdc_table_5
+ primary_keys = ["id"]
+ }
+}