This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5f5d4da13f [BUG][Connector-V2][Jdbc] support postgresql xml type
(#5724)
5f5d4da13f is described below
commit 5f5d4da13f99344880e18620d6c540a6ad82deb7
Author: kk <[email protected]>
AuthorDate: Fri Oct 27 14:19:23 2023 +0800
[BUG][Connector-V2][Jdbc] support postgresql xml type (#5724)
---
.../jdbc/catalog/psql/PostgresDataTypeConvertor.java | 2 ++
.../jdbc/internal/dialect/psql/PostgresTypeMapper.java | 2 ++
.../connectors/seatunnel/jdbc/JdbcPostgresIT.java | 18 ++++++++++++------
.../test/resources/jdbc_postgres_source_and_sink.conf | 2 +-
.../jdbc_postgres_source_and_sink_parallel.conf | 6 +++---
..._postgres_source_and_sink_parallel_upper_lower.conf | 6 +++---
.../resources/jdbc_postgres_source_and_sink_xa.conf | 6 +++---
7 files changed, 26 insertions(+), 16 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
index 9aa1d89d1e..2eaa87f862 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
@@ -108,6 +108,7 @@ public class PostgresDataTypeConvertor implements
DataTypeConvertor<String> {
public static final String PG_GEOGRAPHY = "geography";
public static final String PG_JSON = "json";
public static final String PG_JSONB = "jsonb";
+ public static final String PG_XML = "xml";
@Override
public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
@@ -165,6 +166,7 @@ public class PostgresDataTypeConvertor implements
DataTypeConvertor<String> {
case PG_GEOGRAPHY:
case PG_JSON:
case PG_JSONB:
+ case PG_XML:
return BasicType.STRING_TYPE;
case PG_CHAR_ARRAY:
case PG_CHARACTER_ARRAY:
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java
index 1d82f687c1..d3a950911c 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java
@@ -88,6 +88,7 @@ public class PostgresTypeMapper implements
JdbcDialectTypeMapper {
private static final String PG_GEOGRAPHY = "geography";
private static final String PG_JSON = "json";
private static final String PG_JSONB = "jsonb";
+ private static final String PG_XML = "xml";
@Override
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int
colIndex)
@@ -142,6 +143,7 @@ public class PostgresTypeMapper implements
JdbcDialectTypeMapper {
case PG_GEOGRAPHY:
case PG_JSON:
case PG_JSONB:
+ case PG_XML:
return BasicType.STRING_TYPE;
case PG_CHAR_ARRAY:
case PG_CHARACTER_ARRAY:
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index bf2046ed26..095de999bd 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -104,7 +104,8 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " geometrycollection geometry(GEOMETRYCOLLECTION,
4326),\n"
+ " geog geography(POINT, 4326),\n"
+ " json_col json NOT NULL,\n"
- + " jsonb_col jsonb NOT NULL\n"
+ + " jsonb_col jsonb NOT NULL,\n"
+ + " xml_col xml NOT NULL\n"
+ ")";
private static final String PG_SINK_DDL =
"CREATE TABLE IF NOT EXISTS pg_e2e_sink_table (\n"
@@ -137,7 +138,8 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " geometrycollection varchar(2000) NULL,\n"
+ " geog varchar(2000) NULL,\n"
+ " json_col json NOT NULL \n,"
- + " jsonb_col jsonb NOT NULL\n"
+ + " jsonb_col jsonb NOT NULL,\n"
+ + " xml_col xml NOT NULL\n"
+ " )";
private static final String SOURCE_SQL =
"select \n"
@@ -170,7 +172,8 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ "geometrycollection,\n"
+ "geog,\n"
+ "json_col,\n"
- + "jsonb_col\n"
+ + "jsonb_col,\n"
+ + " cast(xml_col as varchar) \n"
+ "from pg_e2e_source_table";
private static final String SINK_SQL =
"select\n"
@@ -203,7 +206,8 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " cast(geometrycollection as geometry) as
geometrycollection,\n"
+ " cast(geog as geography) as geog,\n"
+ " json_col,\n"
- + " jsonb_col\n"
+ + " jsonb_col,\n"
+ + " cast(xml_col as varchar) \n"
+ "from\n"
+ " pg_e2e_sink_table";
@@ -333,7 +337,8 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " geometrycollection,\n"
+ " geog,\n"
+ " json_col,\n"
- + " jsonb_col \n"
+ + " jsonb_col, \n"
+ + " xml_col \n"
+ " )\n"
+ "VALUES\n"
+ " (\n"
@@ -386,7 +391,8 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " ),\n"
+ " ST_GeographyFromText('POINT(-122.3452
47.5925)'),\n"
+ " '{\"key\":\"test\"}',\n"
- + " '{\"key\":\"test\"}'\n"
+ + " '{\"key\":\"test\"}',\n"
+ + " '<XX:NewSize>test</XX:NewSize>'\n"
+ " )");
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
index 7a34a4f49c..4731bbc70d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
@@ -28,7 +28,7 @@ source{
password = "test"
query ="""select gid, text_col, varchar_col, char_col, boolean_col,
smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col,
double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
- multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col from pg_e2e_source_table"""
+ multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col,xml_col from pg_e2e_source_table"""
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
index 58feafe102..0cd5325d33 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
@@ -28,7 +28,7 @@ source{
password = "test"
query ="""select gid, text_col, varchar_col, char_col, boolean_col,
smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col,
double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
- multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col from pg_e2e_source_table"""
+ multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col,xml_col from pg_e2e_source_table"""
partition_column= "gid"
result_table_name = "jdbc"
@@ -47,7 +47,7 @@ sink {
connection_check_timeout_sec = 100
query ="""INSERT INTO pg_e2e_sink_table ( gid, text_col, varchar_col,
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col,
numeric_col, real_col,
double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col,
bpchar_col, age, name, point,
- linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col)
- VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"""
+ linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col,xml_col)
+ VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,? )"""
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
index 4a98ab6477..30846149d7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
@@ -28,7 +28,7 @@ source{
password = "test"
query ="""select gid, text_col, varchar_col, char_col, boolean_col,
smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col,
double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
- multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col from pg_e2e_source_table"""
+ multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col,xml_col from pg_e2e_source_table"""
partition_column= "gid"
result_table_name = "jdbc"
@@ -51,7 +51,7 @@ sink {
connection_check_timeout_sec = 100
query ="""INSERT INTO pg_e2e_sink_table ( gid, text_col, varchar_col,
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col,
numeric_col, real_col,
double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col,
bpchar_col, age, name, point,
- linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col )
- VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"""
+ linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col,xml_col )
+ VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? ,?)"""
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
index d135b19376..144f11da9b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
@@ -29,7 +29,7 @@ source {
password = "test"
query ="""select gid, text_col, varchar_col, char_col, boolean_col,
smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col,
double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
- multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col from pg_e2e_source_table"""
+ multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col ,xml_col from pg_e2e_source_table"""
}
}
@@ -45,8 +45,8 @@ sink {
max_retries = 0
query ="""INSERT INTO pg_e2e_sink_table ( gid, text_col, varchar_col,
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col,
numeric_col, real_col,
double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col,
bpchar_col, age, name, point,
- linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col )
- VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"""
+ linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col ,xml_col)
+ VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)"""
is_exactly_once = "true"