This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7a862d14b7 [BUG][Connector-V2][Jdbc] support postgresql json type
(#5194)
7a862d14b7 is described below
commit 7a862d14b733147fa4b15227fe36c88656175554
Author: kk <[email protected]>
AuthorDate: Wed Aug 9 11:08:03 2023 +0800
[BUG][Connector-V2][Jdbc] support postgresql json type (#5194)
* add Postgresql json type
Co-authored-by: 80597928 <[email protected]>
---
docs/en/connector-v2/sink/PostgreSql.md | 50 ++++++++++++----------
docs/en/connector-v2/source/PostgreSQL.md | 44 +++++++++----------
.../catalog/psql/PostgresDataTypeConvertor.java | 4 ++
.../internal/dialect/psql/PostgresTypeMapper.java | 4 ++
.../connectors/seatunnel/jdbc/JdbcPostgresIT.java | 26 ++++++++---
.../resources/jdbc_postgres_source_and_sink.conf | 4 +-
.../jdbc_postgres_source_and_sink_parallel.conf | 8 ++--
...tgres_source_and_sink_parallel_upper_lower.conf | 8 ++--
.../jdbc_postgres_source_and_sink_xa.conf | 8 ++--
9 files changed, 90 insertions(+), 66 deletions(-)
diff --git a/docs/en/connector-v2/sink/PostgreSql.md
b/docs/en/connector-v2/sink/PostgreSql.md
index 3cb2b82811..67e2ed64d9 100644
--- a/docs/en/connector-v2/sink/PostgreSql.md
+++ b/docs/en/connector-v2/sink/PostgreSql.md
@@ -36,34 +36,34 @@ semantics (using XA transaction guarantee).
## Data Type Mapping
-| PostgreSQL Data type |
SeaTunnel Data type
|
-|----------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------|
-| BOOL<br/> |
BOOLEAN
|
-| _BOOL<br/> |
ARRAY<BOOLEAN>
|
-| BYTEA<br/> | BYTES
|
-| _BYTEA<br/> |
ARRAY<TINYINT>
|
-| INT2<br/>SMALLSERIAL<br/>INT4<br/>SERIAL<br/> | INT
|
-| _INT2<br/>_INT4<br/> |
ARRAY<INT>
|
-| INT8<br/>BIGSERIAL<br/> |
BIGINT
|
-| _INT8<br/> |
ARRAY<BIGINT>
|
-| FLOAT4<br/> | FLOAT
|
-| _FLOAT4<br/> |
ARRAY<FLOAT>
|
-| FLOAT8<br/> |
DOUBLE
|
-| _FLOAT8<br/> |
ARRAY<DOUBLE>
|
-| NUMERIC(Get the designated column's specified column size>0) |
DECIMAL(Get the designated column's specified column size,Gets the number of
digits in the specified column to the right of the decimal point) |
-| NUMERIC(Get the designated column's specified column size<0) |
DECIMAL(38, 18)
|
-| BPCHAR<br/>CHARACTER<br/>VARCHAR<br/>TEXT<br/>GEOMETRY<br/>GEOGRAPHY |
STRING
|
-| _BPCHAR<br/>_CHARACTER<br/>_VARCHAR<br/>_TEXT |
ARRAY<STRING>
|
-| TIMESTAMP<br/> |
TIMESTAMP
|
-| TIME<br/> | TIME
|
-| DATE<br/> | DATE
|
-| OTHER DATA TYPES | NOT
SUPPORTED YET
|
+| PostgreSQL Data type
|
SeaTunnel Data type
|
+|-----------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------|
+| BOOL<br/>
| BOOLEAN
|
+| _BOOL<br/>
| ARRAY<BOOLEAN>
|
+| BYTEA<br/>
| BYTES
|
+| _BYTEA<br/>
| ARRAY<TINYINT>
|
+| INT2<br/>SMALLSERIAL<br/>INT4<br/>SERIAL<br/>
| INT
|
+| _INT2<br/>_INT4<br/>
| ARRAY<INT>
|
+| INT8<br/>BIGSERIAL<br/>
| BIGINT
|
+| _INT8<br/>
| ARRAY<BIGINT>
|
+| FLOAT4<br/>
| FLOAT
|
+| _FLOAT4<br/>
| ARRAY<FLOAT>
|
+| FLOAT8<br/>
| DOUBLE
|
+| _FLOAT8<br/>
| ARRAY<DOUBLE>
|
+| NUMERIC(Get the designated column's specified column size>0)
| DECIMAL(Get the designated column's specified column size,Gets the
number of digits in the specified column to the right of the decimal point) |
+| NUMERIC(Get the designated column's specified column size<0)
| DECIMAL(38, 18)
|
+|
BPCHAR<br/>CHARACTER<br/>VARCHAR<br/>TEXT<br/>GEOMETRY<br/>GEOGRAPHY<br/>JSON<br/>JSONB
| STRING
|
+| _BPCHAR<br/>_CHARACTER<br/>_VARCHAR<br/>_TEXT
| ARRAY<STRING>
|
+| TIMESTAMP<br/>
| TIMESTAMP
|
+| TIME<br/>
| TIME
|
+| DATE<br/>
| DATE
|
+| OTHER DATA TYPES
| NOT SUPPORTED YET
|
## Options
| Name | Type | Required | Default |
Description
|
|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| url | String | Yes | - |
The URL of the JDBC connection. Refer to a case:
jdbc:postgresql://localhost:5432/test
|
+| url | String | Yes | - |
The URL of the JDBC connection. Refer to a case:
jdbc:postgresql://localhost:5432/test <br/> if you would use json or jsonb
type insert please add jdbc url stringtype=unspecified option
|
| driver | String | Yes | - |
The jdbc class name used to connect to the remote data source,<br/> if you use
PostgreSQL the value is `org.postgresql.Driver`.
|
| user | String | No | - |
Connection instance user name
|
| password | String | No | - |
Connection instance password
|
@@ -124,6 +124,7 @@ transform {
sink {
jdbc {
+ # if you would use json or jsonb type insert please add jdbc url
stringtype=unspecified option
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
user = root
@@ -142,6 +143,7 @@ sink {
```
sink {
Jdbc {
+ # if you would use json or jsonb type insert please add jdbc url
stringtype=unspecified option
url = "jdbc:postgresql://localhost:5432/test"
driver = org.postgresql.Driver
user = root
@@ -161,6 +163,7 @@ sink {
```
sink {
jdbc {
+ # if you would use json or jsonb type insert please add jdbc url
stringtype=unspecified option
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
@@ -183,6 +186,7 @@ sink {
```
sink {
jdbc {
+ # if you would use json or jsonb type insert please add jdbc url
stringtype=unspecified option
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
user = root
diff --git a/docs/en/connector-v2/source/PostgreSQL.md
b/docs/en/connector-v2/source/PostgreSQL.md
index 3f9e13d2e6..5083978072 100644
--- a/docs/en/connector-v2/source/PostgreSQL.md
+++ b/docs/en/connector-v2/source/PostgreSQL.md
@@ -38,28 +38,28 @@ Read external data source data through JDBC.
## Data Type Mapping
-| PostgreSQL Data type |
SeaTunnel Data type
|
-|----------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------|
-| BOOL<br/> |
BOOLEAN
|
-| _BOOL<br/> |
ARRAY<BOOLEAN>
|
-| BYTEA<br/> | BYTES
|
-| _BYTEA<br/> |
ARRAY<TINYINT>
|
-| INT2<br/>SMALLSERIAL<br/>INT4<br/>SERIAL<br/> | INT
|
-| _INT2<br/>_INT4<br/> |
ARRAY<INT>
|
-| INT8<br/>BIGSERIAL<br/> |
BIGINT
|
-| _INT8<br/> |
ARRAY<BIGINT>
|
-| FLOAT4<br/> | FLOAT
|
-| _FLOAT4<br/> |
ARRAY<FLOAT>
|
-| FLOAT8<br/> |
DOUBLE
|
-| _FLOAT8<br/> |
ARRAY<DOUBLE>
|
-| NUMERIC(Get the designated column's specified column size>0) |
DECIMAL(Get the designated column's specified column size,Gets the number of
digits in the specified column to the right of the decimal point) |
-| NUMERIC(Get the designated column's specified column size<0) |
DECIMAL(38, 18)
|
-| BPCHAR<br/>CHARACTER<br/>VARCHAR<br/>TEXT<br/>GEOMETRY<br/>GEOGRAPHY |
STRING
|
-| _BPCHAR<br/>_CHARACTER<br/>_VARCHAR<br/>_TEXT |
ARRAY<STRING>
|
-| TIMESTAMP<br/> |
TIMESTAMP
|
-| TIME<br/> | TIME
|
-| DATE<br/> | DATE
|
-| OTHER DATA TYPES | NOT
SUPPORTED YET
|
+| PostgreSQL Data type
|
SeaTunnel Data type
|
+|-----------------------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------|
+| BOOL<br/>
| BOOLEAN
|
+| _BOOL<br/>
| ARRAY<BOOLEAN>
|
+| BYTEA<br/>
| BYTES
|
+| _BYTEA<br/>
| ARRAY<TINYINT>
|
+| INT2<br/>SMALLSERIAL<br/>INT4<br/>SERIAL<br/>
| INT
|
+| _INT2<br/>_INT4<br/>
| ARRAY<INT>
|
+| INT8<br/>BIGSERIAL<br/>
| BIGINT
|
+| _INT8<br/>
| ARRAY<BIGINT>
|
+| FLOAT4<br/>
| FLOAT
|
+| _FLOAT4<br/>
| ARRAY<FLOAT>
|
+| FLOAT8<br/>
| DOUBLE
|
+| _FLOAT8<br/>
| ARRAY<DOUBLE>
|
+| NUMERIC(Get the designated column's specified column size>0)
| DECIMAL(Get the designated column's specified column size,Gets the
number of digits in the specified column to the right of the decimal point) |
+| NUMERIC(Get the designated column's specified column size<0)
| DECIMAL(38, 18)
|
+|
BPCHAR<br/>CHARACTER<br/>VARCHAR<br/>TEXT<br/>GEOMETRY<br/>GEOGRAPHY<br/>JSON<br/>JSONB
| STRING
|
+| _BPCHAR<br/>_CHARACTER<br/>_VARCHAR<br/>_TEXT
| ARRAY<STRING>
|
+| TIMESTAMP<br/>
| TIMESTAMP
|
+| TIME<br/>
| TIME
|
+| DATE<br/>
| DATE
|
+| OTHER DATA TYPES
| NOT SUPPORTED YET
|
## Options
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
index c87a2fc118..d1f8a5691d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresDataTypeConvertor.java
@@ -105,6 +105,8 @@ public class PostgresDataTypeConvertor implements
DataTypeConvertor<String> {
public static final String PG_INTERVAL = "interval";
public static final String PG_GEOMETRY = "geometry";
public static final String PG_GEOGRAPHY = "geography";
+ public static final String PG_JSON = "json";
+ public static final String PG_JSONB = "jsonb";
@Override
public SeaTunnelDataType<?> toSeaTunnelType(String connectorDataType) {
@@ -160,6 +162,8 @@ public class PostgresDataTypeConvertor implements
DataTypeConvertor<String> {
case PG_INTERVAL:
case PG_GEOMETRY:
case PG_GEOGRAPHY:
+ case PG_JSON:
+ case PG_JSONB:
return BasicType.STRING_TYPE;
case PG_CHAR_ARRAY:
case PG_CHARACTER_ARRAY:
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java
index 25004168d8..6edf6dcdaa 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresTypeMapper.java
@@ -86,6 +86,8 @@ public class PostgresTypeMapper implements
JdbcDialectTypeMapper {
private static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
private static final String PG_GEOMETRY = "geometry";
private static final String PG_GEOGRAPHY = "geography";
+ private static final String PG_JSON = "json";
+ private static final String PG_JSONB = "jsonb";
@SuppressWarnings("checkstyle:MagicNumber")
@Override
@@ -139,6 +141,8 @@ public class PostgresTypeMapper implements
JdbcDialectTypeMapper {
case PG_TEXT:
case PG_GEOMETRY:
case PG_GEOGRAPHY:
+ case PG_JSON:
+ case PG_JSONB:
return BasicType.STRING_TYPE;
case PG_CHAR_ARRAY:
case PG_CHARACTER_ARRAY:
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index 6a3eb231b2..f66ef615d7 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -95,7 +95,9 @@ public class JdbcPostgresIT extends TestSuiteBase implements
TestResource {
+ " multilinestring geometry(MULTILINESTRING, 4326),\n"
+ " multipolygon geometry(MULTIPOLYGON, 4326),\n"
+ " geometrycollection geometry(GEOMETRYCOLLECTION,
4326),\n"
- + " geog geography(POINT, 4326)\n"
+ + " geog geography(POINT, 4326),\n"
+ + " json_col json NOT NULL,\n"
+ + " jsonb_col jsonb NOT NULL\n"
+ ")";
private static final String PG_SINK_DDL =
"CREATE TABLE IF NOT EXISTS pg_e2e_sink_table (\n"
@@ -126,7 +128,9 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " multilinestring varchar(2000) NULL,\n"
+ " multipolygon varchar(2000) NULL,\n"
+ " geometrycollection varchar(2000) NULL,\n"
- + " geog varchar(2000) NULL\n"
+ + " geog varchar(2000) NULL,\n"
+ + " json_col json NOT NULL \n,"
+ + " jsonb_col jsonb NOT NULL\n"
+ " )";
private static final String SOURCE_SQL =
"select \n"
@@ -157,8 +161,10 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ "multilinestring,\n"
+ "multipolygon,\n"
+ "geometrycollection,\n"
- + "geog\n"
- + " from pg_e2e_source_table";
+ + "geog,\n"
+ + "json_col,\n"
+ + "jsonb_col\n"
+ + "from pg_e2e_source_table";
private static final String SINK_SQL =
"select\n"
+ " gid,\n"
@@ -188,7 +194,9 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " cast(multilinestring as geometry) as
multilinestring,\n"
+ " cast(multipolygon as geometry) as multilinestring,\n"
+ " cast(geometrycollection as geometry) as
geometrycollection,\n"
- + " cast(geog as geography) as geog\n"
+ + " cast(geog as geography) as geog,\n"
+ + " json_col,\n"
+ + " jsonb_col\n"
+ "from\n"
+ " pg_e2e_sink_table";
@@ -279,7 +287,9 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " multilinestring,\n"
+ " multipolygon,\n"
+ " geometrycollection,\n"
- + " geog\n"
+ + " geog,\n"
+ + " json_col,\n"
+ + " jsonb_col \n"
+ " )\n"
+ "VALUES\n"
+ " (\n"
@@ -330,7 +340,9 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
+ " 'GEOMETRYCOLLECTION(POINT(-122.3462
47.5921), LINESTRING(-122.3460 47.5924, -122.3457 47.5924))',\n"
+ " 4326\n"
+ " ),\n"
- + " ST_GeographyFromText('POINT(-122.3452
47.5925)')\n"
+ + " ST_GeographyFromText('POINT(-122.3452
47.5925)'),\n"
+ + " '{\"key\":\"test\"}',\n"
+ + " '{\"key\":\"test\"}'\n"
+ " )");
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
index f3293f44e6..7a34a4f49c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink.conf
@@ -28,7 +28,7 @@ source{
password = "test"
query ="""select gid, text_col, varchar_col, char_col, boolean_col,
smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col,
double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
- multilinestring, multipolygon, geometrycollection,
geog from pg_e2e_source_table"""
+ multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col from pg_e2e_source_table"""
}
}
@@ -36,7 +36,7 @@ source{
sink {
Jdbc {
driver = org.postgresql.Driver
- url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
+ url =
"jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF&stringtype=unspecified"
user = test
password = test
generate_sink_sql = true
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
index 25df382c4a..58feafe102 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel.conf
@@ -28,7 +28,7 @@ source{
password = "test"
query ="""select gid, text_col, varchar_col, char_col, boolean_col,
smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col,
double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
- multilinestring, multipolygon, geometrycollection,
geog from pg_e2e_source_table"""
+ multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col from pg_e2e_source_table"""
partition_column= "gid"
result_table_name = "jdbc"
@@ -40,14 +40,14 @@ transform {
sink {
jdbc {
- url = "jdbc:postgresql://postgresql:5432/test"
+ url = "jdbc:postgresql://postgresql:5432/test?stringtype=unspecified"
driver = "org.postgresql.Driver"
user = "test"
password = "test"
connection_check_timeout_sec = 100
query ="""INSERT INTO pg_e2e_sink_table ( gid, text_col, varchar_col,
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col,
numeric_col, real_col,
double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col,
bpchar_col, age, name, point,
- linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog )
- VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"""
+ linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col)
+ VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"""
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
index 46f1b43022..4a98ab6477 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_parallel_upper_lower.conf
@@ -28,7 +28,7 @@ source{
password = "test"
query ="""select gid, text_col, varchar_col, char_col, boolean_col,
smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col,
double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
- multilinestring, multipolygon, geometrycollection,
geog from pg_e2e_source_table"""
+ multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col from pg_e2e_source_table"""
partition_column= "gid"
result_table_name = "jdbc"
@@ -43,7 +43,7 @@ transform {
sink {
jdbc {
- url = "jdbc:postgresql://postgresql:5432/test"
+ url = "jdbc:postgresql://postgresql:5432/test?stringtype=unspecified"
driver = "org.postgresql.Driver"
user = "test"
@@ -51,7 +51,7 @@ sink {
connection_check_timeout_sec = 100
query ="""INSERT INTO pg_e2e_sink_table ( gid, text_col, varchar_col,
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col,
numeric_col, real_col,
double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col,
bpchar_col, age, name, point,
- linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog )
- VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"""
+ linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col )
+ VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"""
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
index ba32ca81bc..d135b19376 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
@@ -29,7 +29,7 @@ source {
password = "test"
query ="""select gid, text_col, varchar_col, char_col, boolean_col,
smallint_col, integer_col, bigint_col, decimal_col, numeric_col, real_col,
double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col,
timestamp_col, bpchar_col, age, name, point, linestring, polygon_colums,
multipoint,
- multilinestring, multipolygon, geometrycollection,
geog from pg_e2e_source_table"""
+ multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col from pg_e2e_source_table"""
}
}
@@ -38,15 +38,15 @@ transform {
sink {
jdbc {
- url = "jdbc:postgresql://postgresql:5432/test"
+ url = "jdbc:postgresql://postgresql:5432/test?stringtype=unspecified"
driver = "org.postgresql.Driver"
user = "test"
password = "test"
max_retries = 0
query ="""INSERT INTO pg_e2e_sink_table ( gid, text_col, varchar_col,
char_col, boolean_col, smallint_col, integer_col, bigint_col, decimal_col,
numeric_col, real_col,
double_precision_col,
smallserial_col, serial_col, bigserial_col, date_col, timestamp_col,
bpchar_col, age, name, point,
- linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog )
- VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"""
+ linestring,
polygon_colums, multipoint, multilinestring, multipolygon, geometrycollection,
geog, json_col, jsonb_col )
+ VALUES( ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"""
is_exactly_once = "true"