This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 84ce516929 [Feature][Connector] update pgsql catalog for save mode
(#6080)
84ce516929 is described below
commit 84ce516929af7810e4c37905c90728e745527e8c
Author: 老王 <[email protected]>
AuthorDate: Sat Jan 13 11:05:48 2024 +0800
[Feature][Connector] update pgsql catalog for save mode (#6080)
---
docs/en/connector-v2/sink/PostgreSql.md | 103 +++++++++++++----
.../jdbc/catalog/psql/PostgresCatalog.java | 13 +++
.../connectors/seatunnel/jdbc/JdbcPostgresIT.java | 127 +++++++++++++++++++++
3 files changed, 220 insertions(+), 23 deletions(-)
diff --git a/docs/en/connector-v2/sink/PostgreSql.md
b/docs/en/connector-v2/sink/PostgreSql.md
index fc70ad0b55..6164c54942 100644
--- a/docs/en/connector-v2/sink/PostgreSql.md
+++ b/docs/en/connector-v2/sink/PostgreSql.md
@@ -61,29 +61,66 @@ semantics (using XA transaction guarantee).
## Options
-| Name | Type | Required | Default |
Description
|
-|-------------------------------------------|---------|----------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| url | String | Yes | - |
The URL of the JDBC connection. Refer to a case:
jdbc:postgresql://localhost:5432/test <br/> if you would use json or jsonb
type insert please add jdbc url stringtype=unspecified option
|
-| driver | String | Yes | - |
The jdbc class name used to connect to the remote data source,<br/> if you use
PostgreSQL the value is `org.postgresql.Driver`.
|
-| user | String | No | - |
Connection instance user name
|
-| password | String | No | - |
Connection instance password
|
-| query | String | No | - |
Use this sql write upstream input datas to database. e.g `INSERT ...`,`query`
have the higher priority
|
-| database | String | No | - |
Use this `database` and `table-name` auto-generate sql and receive upstream
input datas write to database.<br/>This option is mutually exclusive with
`query` and has a higher priority.
|
-| table | String | No | - |
Use database and this table-name auto-generate sql and receive upstream input
datas write to database.<br/>This option is mutually exclusive with `query` and
has a higher priority.
|
-| primary_keys | Array | No | - |
This option is used to support operations such as `insert`, `delete`, and
`update` when automatically generate sql.
|
-| support_upsert_by_query_primary_key_exist | Boolean | No | false |
Choose to use INSERT sql, UPDATE sql to process update events(INSERT,
UPDATE_AFTER) based on query primary key exists. This configuration is only
used when database unsupport upsert syntax. **Note**: that this method has low
performance |
-| connection_check_timeout_sec | Int | No | 30 |
The time in seconds to wait for the database operation used to validate the
connection to complete.
|
-| max_retries | Int | No | 0 |
The number of retries to submit failed (executeBatch)
|
-| batch_size | Int | No | 1000 |
For batch writing, when the number of buffered records reaches the number of
`batch_size` or the time reaches `checkpoint.interval`<br/>, the data will be
flushed into the database
|
-| is_exactly_once | Boolean | No | false |
Whether to enable exactly-once semantics, which will use Xa transactions. If
on, you need to<br/>set `xa_data_source_class_name`.
|
-| generate_sink_sql | Boolean | No | false |
Generate sql statements based on the database table you want to write to.
|
-| xa_data_source_class_name | String | No | - |
The xa data source class name of the database Driver, for example, PostgreSQL
is `org.postgresql.xa.PGXADataSource`, and<br/>please refer to appendix for
other data sources
|
-| max_commit_attempts | Int | No | 3 |
The number of retries for transaction commit failures
|
-| transaction_timeout_sec | Int | No | -1 |
The timeout after the transaction is opened, the default is -1 (never timeout).
Note that setting the timeout may affect<br/>exactly-once semantics
|
-| auto_commit | Boolean | No | true |
Automatic transaction commit is enabled by default
|
-| field_ide | String | No | - |
Identify whether the field needs to be converted when synchronizing from the
source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE`
indicates conversion to uppercase;`LOWERCASE` indicates conversion to
lowercase. |
-| properties | Map | No | - |
Additional connection configuration parameters,when properties and URL have the
same parameters, the priority is determined by the <br/>specific implementation
of the driver. For example, in MySQL, properties take precedence over the URL. |
-| common-options | | no | - |
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
|
+| Name | Type | Required |
Default |
Description
[...]
+|-------------------------------------------|---------|----------|------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| url | String | Yes | -
| The URL of the JDBC connection. Refer to a case:
jdbc:postgresql://localhost:5432/test <br/> if you would use json or jsonb
type insert please add jdbc url stringtype=unspecified option
[...]
+| driver | String | Yes | -
| The jdbc class name used to connect to the remote data
source,<br/> if you use PostgreSQL the value is `org.postgresql.Driver`.
[...]
+| user | String | No | -
| Connection instance user name
[...]
+| password | String | No | -
| Connection instance password
[...]
+| query | String | No | -
| Use this sql write upstream input datas to database. e.g
`INSERT ...`,`query` have the higher priority
[...]
+| database | String | No | -
| Use this `database` and `table-name` auto-generate sql and
receive upstream input datas write to database.<br/>This option is mutually
exclusive with `query` and has a higher priority.
[...]
+| table | String | No | -
| Use database and this table-name auto-generate sql and
receive upstream input datas write to database.<br/>This option is mutually
exclusive with `query` and has a higher priority.The table parameter can fill
in the name of an unwilling table, which will eventually be used as the table
name of the creation table, and supports variables (`${table_name}`,
`${schema_name}`). Replacement rules: [...]
+| primary_keys | Array | No | -
| This option is used to support operations such as `insert`,
`delete`, and `update` when automatically generate sql.
[...]
+| support_upsert_by_query_primary_key_exist | Boolean | No | false
| Choose to use INSERT sql, UPDATE sql to process update
events(INSERT, UPDATE_AFTER) based on query primary key exists. This
configuration is only used when database unsupport upsert syntax. **Note**:
that this method has low performance
[...]
+| connection_check_timeout_sec | Int | No | 30
| The time in seconds to wait for the database operation used
to validate the connection to complete.
[...]
+| max_retries | Int | No | 0
| The number of retries to submit failed (executeBatch)
[...]
+| batch_size | Int | No | 1000
| For batch writing, when the number of buffered records
reaches the number of `batch_size` or the time reaches
`checkpoint.interval`<br/>, the data will be flushed into the database
[...]
+| is_exactly_once | Boolean | No | false
| Whether to enable exactly-once semantics, which will use Xa
transactions. If on, you need to<br/>set `xa_data_source_class_name`.
[...]
+| generate_sink_sql | Boolean | No | false
| Generate sql statements based on the database table you want
to write to.
[...]
+| xa_data_source_class_name | String | No | -
| The xa data source class name of the database Driver, for
example, PostgreSQL is `org.postgresql.xa.PGXADataSource`, and<br/>please refer
to appendix for other data sources
[...]
+| max_commit_attempts | Int | No | 3
| The number of retries for transaction commit failures
[...]
+| transaction_timeout_sec | Int | No | -1
| The timeout after the transaction is opened, the default is
-1 (never timeout). Note that setting the timeout may affect<br/>exactly-once
semantics
[...]
+| auto_commit | Boolean | No | true
| Automatic transaction commit is enabled by default
[...]
+| field_ide | String | No | -
| Identify whether the field needs to be converted when
synchronizing from the source to the sink. `ORIGINAL` indicates no conversion
is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates
conversion to lowercase.
[...]
+| properties | Map | No | -
| Additional connection configuration parameters,when
properties and URL have the same parameters, the priority is determined by the
<br/>specific implementation of the driver. For example, in MySQL, properties
take precedence over the URL.
[...]
+| common-options | | no | -
| Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
[...]
+| schema_save_mode | Enum | no |
CREATE_SCHEMA_WHEN_NOT_EXIST | Before the synchronous task is turned on,
different treatment schemes are selected for the existing surface structure of
the target side.
[...]
+| data_save_mode | Enum | no | APPEND_DATA
| Before the synchronous task is turned on, different
processing schemes are selected for data existing data on the target side.
[...]
+| custom_sql | String | no | -
| When data_save_mode selects CUSTOM_PROCESSING, you should
fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that
can be executed. SQL will be executed before synchronization tasks.
[...]
+
+### table [string]
+
+Use `database` and this `table-name` auto-generate sql and receive upstream
input datas write to database.
+
+This option is mutually exclusive with `query` and has a higher priority.
+
+The table parameter can fill in the name of an unwilling table, which will
eventually be used as the table name of the creation table, and supports
variables (`${table_name}`, `${schema_name}`). Replacement rules:
`${schema_name}` will replace the SCHEMA name passed to the target side, and
`${table_name}` will replace the name of the table passed to the table at the
target side.
+
+for example:
+1. ${schema_name}.${table_name} _test
+2. dbo.tt_${table_name} _sink
+3. public.sink_table
+
+### schema_save_mode[Enum]
+
+Before the synchronous task is turned on, different treatment schemes are
selected for the existing surface structure of the target side.
+Option introduction:
+`RECREATE_SCHEMA` :Will create when the table does not exist, delete and
rebuild when the table is saved
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist,
skipped when the table is saved
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not
exist
+
+### data_save_mode[Enum]
+
+Before the synchronous task is turned on, different processing schemes are
selected for data existing data on the target side.
+Option introduction:
+`DROP_DATA`: Preserve database structure and delete data
+`APPEND_DATA`:Preserve database structure, preserve data
+`CUSTOM_PROCESSING`:User defined processing
+`ERROR_WHEN_DATA_EXISTS`:When there is data, an error is reported
+
+### custom_sql[String]
+
+When data_save_mode selects CUSTOM_PROCESSING, you should fill in the
CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be
executed. SQL will be executed before synchronization tasks.
### Tips
@@ -203,3 +240,23 @@ sink {
}
```
+### Save mode function
+
+```
+sink {
+ Jdbc {
+ # if you would use json or jsonb type insert please add jdbc url
stringtype=unspecified option
+ url = "jdbc:postgresql://localhost:5432/test"
+ driver = org.postgresql.Driver
+ user = root
+ password = 123456
+
+ generate_sink_sql = true
+ database = test
+ table = "public.test_table"
+ schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+ data_save_mode="APPEND_DATA"
+ }
+}
+```
+
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
index a164b0e3c8..8bc7932c68 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
@@ -230,6 +230,19 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
return "CREATE DATABASE \"" + databaseName + "\"";
}
+ public String getExistDataSql(TablePath tablePath) {
+ String schemaName = tablePath.getSchemaName();
+ String tableName = tablePath.getTableName();
+ return String.format("select * from \"%s\".\"%s\" limit 1",
schemaName, tableName);
+ }
+
+ @Override
+ protected String getTruncateTableSql(TablePath tablePath) {
+ String schemaName = tablePath.getSchemaName();
+ String tableName = tablePath.getTableName();
+ return "TRUNCATE TABLE \"" + schemaName + "\".\"" + tableName + "\"";
+ }
+
@Override
protected String getDropDatabaseSql(String databaseName) {
return "DROP DATABASE \"" + databaseName + "\"";
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index 095de999bd..4826b9ce8a 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -443,4 +443,131 @@ public class JdbcPostgresIT extends TestSuiteBase
implements TestResource {
POSTGRESQL_CONTAINER.stop();
}
}
+
+ @TestTemplate
+ public void testCatalogForSaveMode(TestContainer container)
+ throws IOException, InterruptedException {
+ String schema = "public";
+ String databaseName = POSTGRESQL_CONTAINER.getDatabaseName();
+ TablePath tablePathPG = TablePath.of(databaseName, "public",
"pg_e2e_source_table");
+ TablePath tablePathPgSink = TablePath.of(databaseName, "public",
"pg_ide_sink_table_2");
+ PostgresCatalog postgresCatalog =
+ new PostgresCatalog(
+ DatabaseIdentifier.POSTGRESQL,
+ POSTGRESQL_CONTAINER.getUsername(),
+ POSTGRESQL_CONTAINER.getPassword(),
+
JdbcUrlUtil.getUrlInfo(POSTGRESQL_CONTAINER.getJdbcUrl()),
+ schema);
+ postgresCatalog.open();
+ CatalogTable catalogTable = postgresCatalog.getTable(tablePathPG);
+ // sink tableExists ?
+ boolean tableExistsBefore =
postgresCatalog.tableExists(tablePathPgSink);
+ Assertions.assertFalse(tableExistsBefore);
+ // create table
+ postgresCatalog.createTable(tablePathPgSink, catalogTable, true);
+ boolean tableExistsAfter =
postgresCatalog.tableExists(tablePathPgSink);
+ Assertions.assertTrue(tableExistsAfter);
+ // isExistsData ?
+ boolean existsDataBefore =
postgresCatalog.isExistsData(tablePathPgSink);
+ Assertions.assertFalse(existsDataBefore);
+ // insert one data
+ String customSql =
+ "INSERT INTO\n"
+ + " pg_ide_sink_table_2 (gid,\n"
+ + " text_col,\n"
+ + " varchar_col,\n"
+ + " char_col,\n"
+ + " boolean_col,\n"
+ + " smallint_col,\n"
+ + " integer_col,\n"
+ + " bigint_col,\n"
+ + " decimal_col,\n"
+ + " numeric_col,\n"
+ + " real_col,\n"
+ + " double_precision_col,\n"
+ + " smallserial_col,\n"
+ + " serial_col,\n"
+ + " bigserial_col,\n"
+ + " date_col,\n"
+ + " timestamp_col,\n"
+ + " bpchar_col,\n"
+ + " age,\n"
+ + " name,\n"
+ + " point,\n"
+ + " linestring,\n"
+ + " polygon_colums,\n"
+ + " multipoint,\n"
+ + " multilinestring,\n"
+ + " multipolygon,\n"
+ + " geometrycollection,\n"
+ + " geog,\n"
+ + " json_col,\n"
+ + " jsonb_col, \n"
+ + " xml_col \n"
+ + " )\n"
+ + "VALUES\n"
+ + " (\n"
+ + " '"
+ + 999
+ + "',\n"
+ + " 'Hello World',\n"
+ + " 'Test',\n"
+ + " 'Testing',\n"
+ + " true,\n"
+ + " 10,\n"
+ + " 100,\n"
+ + " 1000,\n"
+ + " 10.55,\n"
+ + " 8.8888,\n"
+ + " 3.14,\n"
+ + " 3.14159265,\n"
+ + " 1,\n"
+ + " 100,\n"
+ + " 10000,\n"
+ + " '2023-05-07',\n"
+ + " '2023-05-07 14:30:00',\n"
+ + " 'Testing',\n"
+ + " 21,\n"
+ + " 'Leblanc',\n"
+ + " ST_GeomFromText('POINT(-122.3452 47.5925)',
4326),\n"
+ + " ST_GeomFromText(\n"
+ + " 'LINESTRING(-122.3451 47.5924, -122.3449
47.5923)',\n"
+ + " 4326\n"
+ + " ),\n"
+ + " ST_GeomFromText(\n"
+ + " 'POLYGON((-122.3453 47.5922, -122.3453
47.5926, -122.3448 47.5926, -122.3448 47.5922, -122.3453 47.5922))',\n"
+ + " 4326\n"
+ + " ),\n"
+ + " ST_GeomFromText(\n"
+ + " 'MULTIPOINT(-122.3459 47.5927, -122.3445
47.5918)',\n"
+ + " 4326\n"
+ + " ),\n"
+ + " ST_GeomFromText(\n"
+ + " 'MULTILINESTRING((-122.3463 47.5920,
-122.3461 47.5919),(-122.3459 47.5924, -122.3457 47.5923))',\n"
+ + " 4326\n"
+ + " ),\n"
+ + " ST_GeomFromText(\n"
+ + " 'MULTIPOLYGON(((-122.3458 47.5925, -122.3458
47.5928, -122.3454 47.5928, -122.3454 47.5925, -122.3458 47.5925)),((-122.3453
47.5921, -122.3453 47.5924, -122.3448 47.5924, -122.3448 47.5921, -122.3453
47.5921)))',\n"
+ + " 4326\n"
+ + " ),\n"
+ + " ST_GeomFromText(\n"
+ + " 'GEOMETRYCOLLECTION(POINT(-122.3462 47.5921),
LINESTRING(-122.3460 47.5924, -122.3457 47.5924))',\n"
+ + " 4326\n"
+ + " ),\n"
+ + " ST_GeographyFromText('POINT(-122.3452
47.5925)'),\n"
+ + " '{\"key\":\"test\"}',\n"
+ + " '{\"key\":\"test\"}',\n"
+ + " '<XX:NewSize>test</XX:NewSize>'\n"
+ + " )";
+ postgresCatalog.executeSql(tablePathPgSink, customSql);
+ boolean existsDataAfter =
postgresCatalog.isExistsData(tablePathPgSink);
+ Assertions.assertTrue(existsDataAfter);
+ // truncateTable
+ postgresCatalog.truncateTable(tablePathPgSink, true);
+ Assertions.assertFalse(postgresCatalog.isExistsData(tablePathPgSink));
+ // drop table
+ postgresCatalog.dropTable(tablePathPgSink, true);
+ Assertions.assertFalse(postgresCatalog.tableExists(tablePathPgSink));
+ postgresCatalog.close();
+ }
}