This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3dc79c1ddf [Fix][Connector-Jdbc] Supports reading and writing Postgres 
network dress types (#9618)
3dc79c1ddf is described below

commit 3dc79c1ddfbaec93120bfbaf1189aaa49363bd41
Author: chestnufang <[email protected]>
AuthorDate: Mon Jul 28 10:09:10 2025 +0800

    [Fix][Connector-Jdbc] Supports reading and writing Postgres network dress 
types (#9618)
---
 .../dialect/psql/PostgresJdbcRowConverter.java     | 17 ++++--
 .../dialect/psql/PostgresTypeConverter.java        |  6 +++
 .../dialect/psql/PostgresTypeConverterTest.java    | 35 +++++++++++++
 .../seatunnel/cdc/postgres/PostgresCDCIT.java      | 31 +++++++++++
 .../src/test/resources/ddl/inventory.sql           | 32 ++++++++++++
 ...cdc_to_postgres_with_network_address_types.conf | 60 ++++++++++++++++++++++
 6 files changed, 176 insertions(+), 5 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
index b8c1d21aa9..6866e428e4 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresJdbcRowConverter.java
@@ -52,8 +52,11 @@ import java.time.LocalTime;
 import java.util.Locale;
 import java.util.Optional;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_CIDR;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INET;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_INTERVAL;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_MAC_ADDR;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeConverter.PG_MAC_ADDR8;
 
 public class PostgresJdbcRowConverter extends AbstractJdbcRowConverter {
 
@@ -188,11 +191,15 @@ public class PostgresJdbcRowConverter extends 
AbstractJdbcRowConverter {
                 switch (seaTunnelDataType.getSqlType()) {
                     case STRING:
                         String sourceType = sourceTypes[fieldIndex];
-                        if (PG_INET.equalsIgnoreCase(sourceType)) {
-                            PGobject inetObject = new PGobject();
-                            inetObject.setType(PG_INET);
-                            
inetObject.setValue(String.valueOf(row.getField(fieldIndex)));
-                            statement.setObject(statementIndex, inetObject);
+                        if (PG_INET.equalsIgnoreCase(sourceType)
+                                || PG_CIDR.equalsIgnoreCase(sourceType)
+                                || PG_MAC_ADDR.equalsIgnoreCase(sourceType)
+                                || PG_MAC_ADDR8.equalsIgnoreCase(sourceType)) {
+                            // handle network address types of postgres
+                            PGobject networkTypeObject = new PGobject();
+                            networkTypeObject.setType(sourceType);
+                            
networkTypeObject.setValue(String.valueOf(row.getField(fieldIndex)));
+                            statement.setObject(statementIndex, 
networkTypeObject);
                         } else if (PG_INTERVAL.equalsIgnoreCase(sourceType)) {
                             PGobject intervalObject = new PGobject();
                             intervalObject.setType(PG_INTERVAL);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
index 3abb5126a3..f472d3bce5 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverter.java
@@ -83,6 +83,9 @@ public class PostgresTypeConverter implements 
TypeConverter<BasicTypeDefine> {
     // character varying <=> varchar
     public static final String PG_VARCHAR = "varchar";
     public static final String PG_INET = "inet";
+    public static final String PG_CIDR = "cidr";
+    public static final String PG_MAC_ADDR = "macaddr";
+    public static final String PG_MAC_ADDR8 = "macaddr8";
     public static final String PG_CHARACTER_VARYING = "character varying";
     // character varying[] <=> varchar[] <=> _varchar
     public static final String PG_VARCHAR_ARRAY = "_varchar";
@@ -228,6 +231,9 @@ public class PostgresTypeConverter implements 
TypeConverter<BasicTypeDefine> {
             case PG_GEOGRAPHY:
             case PG_INET:
             case PG_INTERVAL:
+            case PG_CIDR:
+            case PG_MAC_ADDR:
+            case PG_MAC_ADDR8:
                 builder.dataType(BasicType.STRING_TYPE);
                 builder.sourceType(pgDataType);
                 break;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
index 28c0b54a56..ce48ddab64 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeConverterTest.java
@@ -827,4 +827,39 @@ public class PostgresTypeConverterTest {
         Assertions.assertEquals(null, column.getColumnLength());
         Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
     }
+
+    @Test
+    public void testConvertNetworkAddressTypes() {
+        BasicTypeDefine<Object> typeDefine =
+                
BasicTypeDefine.builder().name("test").columnType("cidr").dataType("cidr").build();
+        Column column = PostgresTypeConverter.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals(typeDefine.getName(), column.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType());
+        Assertions.assertNull(column.getColumnLength());
+        Assertions.assertEquals(typeDefine.getColumnType(), 
column.getSourceType());
+
+        BasicTypeDefine<Object> typeDefine1 =
+                BasicTypeDefine.builder()
+                        .name("test1")
+                        .columnType("macaddr")
+                        .dataType("macaddr")
+                        .build();
+        Column column1 = PostgresTypeConverter.INSTANCE.convert(typeDefine1);
+        Assertions.assertEquals(typeDefine1.getName(), column1.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, column1.getDataType());
+        Assertions.assertNull(column1.getColumnLength());
+        Assertions.assertEquals(typeDefine1.getColumnType(), 
column1.getSourceType());
+
+        BasicTypeDefine<Object> typeDefine2 =
+                BasicTypeDefine.builder()
+                        .name("test2")
+                        .columnType("macaddr8")
+                        .dataType("macaddr8")
+                        .build();
+        Column column2 = PostgresTypeConverter.INSTANCE.convert(typeDefine2);
+        Assertions.assertEquals(typeDefine2.getName(), column2.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, column2.getDataType());
+        Assertions.assertNull(column2.getColumnLength());
+        Assertions.assertEquals(typeDefine2.getColumnType(), 
column2.getSourceType());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index ed290bbbdd..d6c42de2af 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -112,10 +112,12 @@ public class PostgresCDCIT extends TestSuiteBase 
implements TestResource {
     private static final String SOURCE_TABLE_2 = "postgres_cdc_table_2";
     private static final String SOURCE_TABLE_3 = "postgres_cdc_table_3";
     private static final String SOURCE_TABLE_4 = "postgres_cdc_table_4";
+    private static final String SOURCE_TABLE_5 = "postgres_cdc_table_5";
     private static final String SINK_TABLE_1 = "sink_postgres_cdc_table_1";
     private static final String SINK_TABLE_2 = "sink_postgres_cdc_table_2";
     private static final String SINK_TABLE_3 = "sink_postgres_cdc_table_3";
     private static final String SINK_TABLE_4 = "sink_postgres_cdc_table_4";
+    private static final String SINK_TABLE_5 = "sink_postgres_cdc_table_5";
 
     private static final String SOURCE_TABLE_NO_PRIMARY_KEY = 
"full_types_no_primary_key";
 
@@ -804,6 +806,35 @@ public class PostgresCDCIT extends TestSuiteBase 
implements TestResource {
         }
     }
 
+    @TestTemplate
+    public void testPostgresCdcCheckDataWithNetworkAddressTypes(TestContainer 
container) {
+        try {
+            CompletableFuture.supplyAsync(
+                    () -> {
+                        try {
+                            container.executeJob(
+                                    
"/postgrescdc_to_postgres_with_network_address_types.conf");
+                        } catch (Exception e) {
+                            log.error("Commit task exception :" + 
e.getMessage());
+                            throw new RuntimeException(e);
+                        }
+                        return null;
+                    });
+
+            // stream stage
+            await().atMost(60000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () -> {
+                                Assertions.assertIterableEquals(
+                                        query(getQuerySQL(POSTGRESQL_SCHEMA, 
SOURCE_TABLE_5)),
+                                        query(getQuerySQL(POSTGRESQL_SCHEMA, 
SINK_TABLE_5)));
+                            });
+        } finally {
+            clearTable(POSTGRESQL_SCHEMA, SOURCE_TABLE_5);
+            clearTable(POSTGRESQL_SCHEMA, SINK_TABLE_5);
+        }
+    }
+
     @Test
     public void testDialectCheckDisabledCDCTable() throws SQLException {
         JdbcSourceConfigFactory factory =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
index 095a593cb1..c823ac5a3d 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/ddl/inventory.sql
@@ -216,6 +216,32 @@ CREATE TABLE sink_postgres_cdc_table_4
     PRIMARY KEY (id)
 );
 
+CREATE TABLE postgres_cdc_table_5
+(
+    id        INTEGER NOT NULL,
+    f_bytea   BYTEA,
+    f_small   SMALLINT,
+    f_interval INTERVAL,
+    ip        INET,
+    network   CIDR,
+    mac       MACADDR,
+    mac8      MACADDR8,
+    PRIMARY KEY (id)
+);
+
+CREATE TABLE sink_postgres_cdc_table_5
+(
+    id        INTEGER NOT NULL,
+    f_bytea   BYTEA,
+    f_small   SMALLINT,
+    f_interval INTERVAL,
+    ip        INET,
+    network   CIDR,
+    mac       MACADDR,
+    mac8      MACADDR8,
+    PRIMARY KEY (id)
+);
+
 ALTER TABLE postgres_cdc_table_1
     REPLICA IDENTITY FULL;
 
@@ -228,6 +254,9 @@ ALTER TABLE postgres_cdc_table_3
 ALTER TABLE postgres_cdc_table_4
     REPLICA IDENTITY FULL;
 
+ALTER TABLE postgres_cdc_table_5
+    REPLICA IDENTITY FULL;
+
 ALTER TABLE sink_postgres_cdc_table_1
     REPLICA IDENTITY FULL;
 
@@ -256,6 +285,9 @@ VALUES (1, '2', 32767, 65535);
 INSERT INTO postgres_cdc_table_4
 VALUES (1, '2', 32767, INTERVAL '2 days 3 hours');
 
+INSERT INTO postgres_cdc_table_5 (id, f_bytea, f_small, f_interval, ip, 
network, mac, mac8)
+VALUES (1, '2', 32767, INTERVAL '1 day 2 hours', '192.168.1.100', 
'192.168.1.0/24', '08:00:2b:01:02:03', '08:00:2b:01:02:03:04:05');
+
 INSERT INTO full_types_no_primary_key
 VALUES (1, '2', 32767, 65535, 2147483647, 5.5, 6.6, 123.12345, 404.4443, true,
         'Hello World', 'a', 'abc', 'abcd..xyz', '2020-07-17 18:00:22.123', 
'2020-07-17 18:00:22.123456',
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_network_address_types.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_network_address_types.conf
new file mode 100644
index 0000000000..6c096bca14
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/resources/postgrescdc_to_postgres_with_network_address_types.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  read_limit.bytes_per_second=7000000
+  read_limit.rows_per_second=400
+}
+
+source {
+  Postgres-CDC {
+    plugin_output = "customers_postgres_cdc"
+    username = "postgres"
+    password = "postgres"
+    database-names = ["postgres_cdc"]
+    schema-names = ["inventory"]
+    table-names = ["postgres_cdc.inventory.postgres_cdc_table_5"]
+    base-url = 
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+  }
+}
+
+transform {
+
+}
+
+sink {
+  jdbc {
+    plugin_input = "customers_postgres_cdc"
+    url = 
"jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
+    driver = "org.postgresql.Driver"
+    user = "postgres"
+    password = "postgres"
+
+    generate_sink_sql = true
+    # You need to configure both database and table
+    database = postgres_cdc
+    table = inventory.sink_postgres_cdc_table_5
+    primary_keys = ["id"]
+  }
+}

Reply via email to