This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 84ce516929 [Feature][Connector] update pgsql catalog for save mode 
(#6080)
84ce516929 is described below

commit 84ce516929af7810e4c37905c90728e745527e8c
Author: 老王 <[email protected]>
AuthorDate: Sat Jan 13 11:05:48 2024 +0800

    [Feature][Connector] update pgsql catalog for save mode (#6080)
---
 docs/en/connector-v2/sink/PostgreSql.md            | 103 +++++++++++++----
 .../jdbc/catalog/psql/PostgresCatalog.java         |  13 +++
 .../connectors/seatunnel/jdbc/JdbcPostgresIT.java  | 127 +++++++++++++++++++++
 3 files changed, 220 insertions(+), 23 deletions(-)

diff --git a/docs/en/connector-v2/sink/PostgreSql.md 
b/docs/en/connector-v2/sink/PostgreSql.md
index fc70ad0b55..6164c54942 100644
--- a/docs/en/connector-v2/sink/PostgreSql.md
+++ b/docs/en/connector-v2/sink/PostgreSql.md
@@ -61,29 +61,66 @@ semantics (using XA transaction guarantee).
 
 ## Options
 
-|                   Name                    |  Type   | Required | Default |   
                                                                                
                               Description                                      
                                                                             |
-|-------------------------------------------|---------|----------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| url                                       | String  | Yes      | -       | 
The URL of the JDBC connection. Refer to a case: 
jdbc:postgresql://localhost:5432/test <br/>  if you would use json or jsonb 
type insert please add jdbc url stringtype=unspecified option                   
                                  |
-| driver                                    | String  | Yes      | -       | 
The jdbc class name used to connect to the remote data source,<br/> if you use 
PostgreSQL the value is `org.postgresql.Driver`.                                
                                                                                
|
-| user                                      | String  | No       | -       | 
Connection instance user name                                                   
                                                                                
                                                                               |
-| password                                  | String  | No       | -       | 
Connection instance password                                                    
                                                                                
                                                                               |
-| query                                     | String  | No       | -       | 
Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` 
have the higher priority                                                        
                                                                                
 |
-| database                                  | String  | No       | -       | 
Use this `database` and `table-name` auto-generate sql and receive upstream 
input datas write to database.<br/>This option is mutually exclusive with 
`query` and has a higher priority.                                              
         |
-| table                                     | String  | No       | -       | 
Use database and this table-name auto-generate sql and receive upstream input 
datas write to database.<br/>This option is mutually exclusive with `query` and 
has a higher priority.                                                          
 |
-| primary_keys                              | Array   | No       | -       | 
This option is used to support operations such as `insert`, `delete`, and 
`update` when automatically generate sql.                                       
                                                                                
     |
-| support_upsert_by_query_primary_key_exist | Boolean | No       | false   | 
Choose to use INSERT sql, UPDATE sql to process update events(INSERT, 
UPDATE_AFTER) based on query primary key exists. This configuration is only 
used when database unsupport upsert syntax. **Note**: that this method has low 
performance   |
-| connection_check_timeout_sec              | Int     | No       | 30      | 
The time in seconds to wait for the database operation used to validate the 
connection to complete.                                                         
                                                                                
   |
-| max_retries                               | Int     | No       | 0       | 
The number of retries to submit failed (executeBatch)                           
                                                                                
                                                                               |
-| batch_size                                | Int     | No       | 1000    | 
For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `checkpoint.interval`<br/>, the data will be 
flushed into the database                                                       
    |
-| is_exactly_once                           | Boolean | No       | false   | 
Whether to enable exactly-once semantics, which will use Xa transactions. If 
on, you need to<br/>set `xa_data_source_class_name`.                            
                                                                                
  |
-| generate_sink_sql                         | Boolean | No       | false   | 
Generate sql statements based on the database table you want to write to.       
                                                                                
                                                                               |
-| xa_data_source_class_name                 | String  | No       | -       | 
The xa data source class name of the database Driver, for example, PostgreSQL 
is `org.postgresql.xa.PGXADataSource`, and<br/>please refer to appendix for 
other data sources                                                              
     |
-| max_commit_attempts                       | Int     | No       | 3       | 
The number of retries for transaction commit failures                           
                                                                                
                                                                               |
-| transaction_timeout_sec                   | Int     | No       | -1      | 
The timeout after the transaction is opened, the default is -1 (never timeout). 
Note that setting the timeout may affect<br/>exactly-once semantics             
                                                                               |
-| auto_commit                               | Boolean | No       | true    | 
Automatic transaction commit is enabled by default                              
                                                                                
                                                                               |
-| field_ide                                 | String  | No       | -       | 
Identify whether the field needs to be converted when synchronizing from the 
source to the sink. `ORIGINAL` indicates no conversion is needed;`UPPERCASE` 
indicates conversion to uppercase;`LOWERCASE` indicates conversion to 
lowercase.     |
-| properties                                | Map     | No       | -       | 
Additional connection configuration parameters,when properties and URL have the 
same parameters, the priority is determined by the <br/>specific implementation 
of the driver. For example, in MySQL, properties take precedence over the URL. |
-| common-options                            |         | no       | -       | 
Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details                                         
                                                                                
                   |
+|                   Name                    |  Type   | Required |           
Default            |                                                            
                                                                                
                                                                                
                                                        Description             
                                                                                
                [...]
+|-------------------------------------------|---------|----------|------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| url                                       | String  | Yes      | -           
                 | The URL of the JDBC connection. Refer to a case: 
jdbc:postgresql://localhost:5432/test <br/>  if you would use json or jsonb 
type insert please add jdbc url stringtype=unspecified option                   
                                                                                
                                                                                
                              [...]
+| driver                                    | String  | Yes      | -           
                 | The jdbc class name used to connect to the remote data 
source,<br/> if you use PostgreSQL the value is `org.postgresql.Driver`.        
                                                                                
                                                                                
                                                                                
                    [...]
+| user                                      | String  | No       | -           
                 | Connection instance user name                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| password                                  | String  | No       | -           
                 | Connection instance password                                 
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| query                                     | String  | No       | -           
                 | Use this sql write upstream input datas to database. e.g 
`INSERT ...`,`query` have the higher priority                                   
                                                                                
                                                                                
                                                                                
                  [...]
+| database                                  | String  | No       | -           
                 | Use this `database` and `table-name` auto-generate sql and 
receive upstream input datas write to database.<br/>This option is mutually 
exclusive with `query` and has a higher priority.                               
                                                                                
                                                                                
                    [...]
+| table                                     | String  | No       | -           
                 | Use database and this table-name auto-generate sql and 
receive upstream input datas write to database.<br/>This option is mutually 
exclusive with `query` and has a higher priority.The table parameter can fill 
in the name of an unwilling table, which will eventually be used as the table 
name of the creation table, and supports variables (`${table_name}`, 
`${schema_name}`). Replacement rules:  [...]
+| primary_keys                              | Array   | No       | -           
                 | This option is used to support operations such as `insert`, 
`delete`, and `update` when automatically generate sql.                         
                                                                                
                                                                                
                                                                                
               [...]
+| support_upsert_by_query_primary_key_exist | Boolean | No       | false       
                 | Choose to use INSERT sql, UPDATE sql to process update 
events(INSERT, UPDATE_AFTER) based on query primary key exists. This 
configuration is only used when database unsupport upsert syntax. **Note**: 
that this method has low performance                                            
                                                                                
                                   [...]
+| connection_check_timeout_sec              | Int     | No       | 30          
                 | The time in seconds to wait for the database operation used 
to validate the connection to complete.                                         
                                                                                
                                                                                
                                                                                
               [...]
+| max_retries                               | Int     | No       | 0           
                 | The number of retries to submit failed (executeBatch)        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| batch_size                                | Int     | No       | 1000        
                 | For batch writing, when the number of buffered records 
reaches the number of `batch_size` or the time reaches 
`checkpoint.interval`<br/>, the data will be flushed into the database          
                                                                                
                                                                                
                                             [...]
+| is_exactly_once                           | Boolean | No       | false       
                 | Whether to enable exactly-once semantics, which will use Xa 
transactions. If on, you need to<br/>set `xa_data_source_class_name`.           
                                                                                
                                                                                
                                                                                
               [...]
+| generate_sink_sql                         | Boolean | No       | false       
                 | Generate sql statements based on the database table you want 
to write to.                                                                    
                                                                                
                                                                                
                                                                                
              [...]
+| xa_data_source_class_name                 | String  | No       | -           
                 | The xa data source class name of the database Driver, for 
example, PostgreSQL is `org.postgresql.xa.PGXADataSource`, and<br/>please refer 
to appendix for other data sources                                              
                                                                                
                                                                                
                 [...]
+| max_commit_attempts                       | Int     | No       | 3           
                 | The number of retries for transaction commit failures        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| transaction_timeout_sec                   | Int     | No       | -1          
                 | The timeout after the transaction is opened, the default is 
-1 (never timeout). Note that setting the timeout may affect<br/>exactly-once 
semantics                                                                       
                                                                                
                                                                                
                 [...]
+| auto_commit                               | Boolean | No       | true        
                 | Automatic transaction commit is enabled by default           
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| field_ide                                 | String  | No       | -           
                 | Identify whether the field needs to be converted when 
synchronizing from the source to the sink. `ORIGINAL` indicates no conversion 
is needed;`UPPERCASE` indicates conversion to uppercase;`LOWERCASE` indicates 
conversion to lowercase.                                                        
                                                                                
                         [...]
+| properties                                | Map     | No       | -           
                 | Additional connection configuration parameters,when 
properties and URL have the same parameters, the priority is determined by the 
<br/>specific implementation of the driver. For example, in MySQL, properties 
take precedence over the URL.                                                   
                                                                                
                          [...]
+| common-options                            |         | no       | -           
                 | Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details                                         
                                                                                
                                                                                
                                                                                
               [...]
+| schema_save_mode                          | Enum    | no       | 
CREATE_SCHEMA_WHEN_NOT_EXIST | Before the synchronous task is turned on, 
different treatment schemes are selected for the existing surface structure of 
the target side.                                                                
                                                                                
                                                                                
                                  [...]
+| data_save_mode                            | Enum    | no       | APPEND_DATA 
                 | Before the synchronous task is turned on, different 
processing schemes are selected for data existing data on the target side.      
                                                                                
                                                                                
                                                                                
                       [...]
+| custom_sql                                | String  | no       | -           
                 | When data_save_mode selects CUSTOM_PROCESSING, you should 
fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that 
can be executed. SQL will be executed before synchronization tasks.             
                                                                                
                                                                                
                    [...]
+
+### table [string]
+
+Use `database` and this `table-name` auto-generate sql and receive upstream 
input datas write to database.
+
+This option is mutually exclusive with `query` and has a higher priority.
+
+The table parameter can fill in the name of an unwilling table, which will 
eventually be used as the table name of the creation table, and supports 
variables (`${table_name}`, `${schema_name}`). Replacement rules: 
`${schema_name}` will replace the SCHEMA name passed to the target side, and 
`${table_name}` will replace the name of the table passed to the table at the 
target side.
+
+for example:
+1. ${schema_name}.${table_name} _test
+2. dbo.tt_${table_name} _sink
+3. public.sink_table
+
+### schema_save_mode[Enum]
+
+Before the synchronous task is turned on, different treatment schemes are 
selected for the existing surface structure of the target side.  
+Option introduction:  
+`RECREATE_SCHEMA` :Will create when the table does not exist, delete and 
rebuild when the table is saved        
+`CREATE_SCHEMA_WHEN_NOT_EXIST` :Will Created when the table does not exist, 
skipped when the table is saved        
+`ERROR_WHEN_SCHEMA_NOT_EXIST` :Error will be reported when the table does not 
exist
+
+### data_save_mode[Enum]
+
+Before the synchronous task is turned on, different processing schemes are 
selected for data existing data on the target side.  
+Option introduction:  
+`DROP_DATA`: Preserve database structure and delete data  
+`APPEND_DATA`:Preserve database structure, preserve data  
+`CUSTOM_PROCESSING`:User defined processing  
+`ERROR_WHEN_DATA_EXISTS`:When there is data, an error is reported
+
+### custom_sql[String]
+
+When data_save_mode selects CUSTOM_PROCESSING, you should fill in the 
CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be 
executed. SQL will be executed before synchronization tasks.
 
 ### Tips
 
@@ -203,3 +240,23 @@ sink {
 }
 ```
 
+### Save mode function
+
+```
+sink {
+    Jdbc {
+        # if you would use json or jsonb type insert please add jdbc url 
stringtype=unspecified option
+        url = "jdbc:postgresql://localhost:5432/test"
+        driver = org.postgresql.Driver
+        user = root
+        password = 123456
+        
+        generate_sink_sql = true
+        database = test
+        table = "public.test_table"
+        schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
+        data_save_mode="APPEND_DATA"
+    }
+}
+```
+
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
index a164b0e3c8..8bc7932c68 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCatalog.java
@@ -230,6 +230,19 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
         return "CREATE DATABASE \"" + databaseName + "\"";
     }
 
+    public String getExistDataSql(TablePath tablePath) {
+        String schemaName = tablePath.getSchemaName();
+        String tableName = tablePath.getTableName();
+        return String.format("select * from \"%s\".\"%s\" limit 1", 
schemaName, tableName);
+    }
+
+    @Override
+    protected String getTruncateTableSql(TablePath tablePath) {
+        String schemaName = tablePath.getSchemaName();
+        String tableName = tablePath.getTableName();
+        return "TRUNCATE TABLE  \"" + schemaName + "\".\"" + tableName + "\"";
+    }
+
     @Override
     protected String getDropDatabaseSql(String databaseName) {
         return "DROP DATABASE \"" + databaseName + "\"";
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index 095de999bd..4826b9ce8a 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -443,4 +443,131 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
             POSTGRESQL_CONTAINER.stop();
         }
     }
+
+    @TestTemplate
+    public void testCatalogForSaveMode(TestContainer container)
+            throws IOException, InterruptedException {
+        String schema = "public";
+        String databaseName = POSTGRESQL_CONTAINER.getDatabaseName();
+        TablePath tablePathPG = TablePath.of(databaseName, "public", 
"pg_e2e_source_table");
+        TablePath tablePathPgSink = TablePath.of(databaseName, "public", 
"pg_ide_sink_table_2");
+        PostgresCatalog postgresCatalog =
+                new PostgresCatalog(
+                        DatabaseIdentifier.POSTGRESQL,
+                        POSTGRESQL_CONTAINER.getUsername(),
+                        POSTGRESQL_CONTAINER.getPassword(),
+                        
JdbcUrlUtil.getUrlInfo(POSTGRESQL_CONTAINER.getJdbcUrl()),
+                        schema);
+        postgresCatalog.open();
+        CatalogTable catalogTable = postgresCatalog.getTable(tablePathPG);
+        // sink tableExists ?
+        boolean tableExistsBefore = 
postgresCatalog.tableExists(tablePathPgSink);
+        Assertions.assertFalse(tableExistsBefore);
+        // create table
+        postgresCatalog.createTable(tablePathPgSink, catalogTable, true);
+        boolean tableExistsAfter = 
postgresCatalog.tableExists(tablePathPgSink);
+        Assertions.assertTrue(tableExistsAfter);
+        // isExistsData ?
+        boolean existsDataBefore = 
postgresCatalog.isExistsData(tablePathPgSink);
+        Assertions.assertFalse(existsDataBefore);
+        // insert one data
+        String customSql =
+                "INSERT INTO\n"
+                        + "  pg_ide_sink_table_2 (gid,\n"
+                        + "    text_col,\n"
+                        + "    varchar_col,\n"
+                        + "    char_col,\n"
+                        + "    boolean_col,\n"
+                        + "    smallint_col,\n"
+                        + "    integer_col,\n"
+                        + "    bigint_col,\n"
+                        + "    decimal_col,\n"
+                        + "    numeric_col,\n"
+                        + "    real_col,\n"
+                        + "    double_precision_col,\n"
+                        + "    smallserial_col,\n"
+                        + "    serial_col,\n"
+                        + "    bigserial_col,\n"
+                        + "    date_col,\n"
+                        + "    timestamp_col,\n"
+                        + "    bpchar_col,\n"
+                        + "    age,\n"
+                        + "    name,\n"
+                        + "    point,\n"
+                        + "    linestring,\n"
+                        + "    polygon_colums,\n"
+                        + "    multipoint,\n"
+                        + "    multilinestring,\n"
+                        + "    multipolygon,\n"
+                        + "    geometrycollection,\n"
+                        + "    geog,\n"
+                        + "    json_col,\n"
+                        + "    jsonb_col, \n"
+                        + "    xml_col \n"
+                        + "  )\n"
+                        + "VALUES\n"
+                        + "  (\n"
+                        + "    '"
+                        + 999
+                        + "',\n"
+                        + "    'Hello World',\n"
+                        + "    'Test',\n"
+                        + "    'Testing',\n"
+                        + "    true,\n"
+                        + "    10,\n"
+                        + "    100,\n"
+                        + "    1000,\n"
+                        + "    10.55,\n"
+                        + "    8.8888,\n"
+                        + "    3.14,\n"
+                        + "    3.14159265,\n"
+                        + "    1,\n"
+                        + "    100,\n"
+                        + "    10000,\n"
+                        + "    '2023-05-07',\n"
+                        + "    '2023-05-07 14:30:00',\n"
+                        + "    'Testing',\n"
+                        + "    21,\n"
+                        + "    'Leblanc',\n"
+                        + "    ST_GeomFromText('POINT(-122.3452 47.5925)', 
4326),\n"
+                        + "    ST_GeomFromText(\n"
+                        + "      'LINESTRING(-122.3451 47.5924, -122.3449 
47.5923)',\n"
+                        + "      4326\n"
+                        + "    ),\n"
+                        + "    ST_GeomFromText(\n"
+                        + "      'POLYGON((-122.3453 47.5922, -122.3453 
47.5926, -122.3448 47.5926, -122.3448 47.5922, -122.3453 47.5922))',\n"
+                        + "      4326\n"
+                        + "    ),\n"
+                        + "    ST_GeomFromText(\n"
+                        + "      'MULTIPOINT(-122.3459 47.5927, -122.3445 
47.5918)',\n"
+                        + "      4326\n"
+                        + "    ),\n"
+                        + "    ST_GeomFromText(\n"
+                        + "      'MULTILINESTRING((-122.3463 47.5920, 
-122.3461 47.5919),(-122.3459 47.5924, -122.3457 47.5923))',\n"
+                        + "      4326\n"
+                        + "    ),\n"
+                        + "    ST_GeomFromText(\n"
+                        + "      'MULTIPOLYGON(((-122.3458 47.5925, -122.3458 
47.5928, -122.3454 47.5928, -122.3454 47.5925, -122.3458 47.5925)),((-122.3453 
47.5921, -122.3453 47.5924, -122.3448 47.5924, -122.3448 47.5921, -122.3453 
47.5921)))',\n"
+                        + "      4326\n"
+                        + "    ),\n"
+                        + "    ST_GeomFromText(\n"
+                        + "      'GEOMETRYCOLLECTION(POINT(-122.3462 47.5921), 
LINESTRING(-122.3460 47.5924, -122.3457 47.5924))',\n"
+                        + "      4326\n"
+                        + "    ),\n"
+                        + "    ST_GeographyFromText('POINT(-122.3452 
47.5925)'),\n"
+                        + "    '{\"key\":\"test\"}',\n"
+                        + "    '{\"key\":\"test\"}',\n"
+                        + "    '<XX:NewSize>test</XX:NewSize>'\n"
+                        + "  )";
+        postgresCatalog.executeSql(tablePathPgSink, customSql);
+        boolean existsDataAfter = 
postgresCatalog.isExistsData(tablePathPgSink);
+        Assertions.assertTrue(existsDataAfter);
+        // truncateTable
+        postgresCatalog.truncateTable(tablePathPgSink, true);
+        Assertions.assertFalse(postgresCatalog.isExistsData(tablePathPgSink));
+        // drop table
+        postgresCatalog.dropTable(tablePathPgSink, true);
+        Assertions.assertFalse(postgresCatalog.tableExists(tablePathPgSink));
+        postgresCatalog.close();
+    }
 }

Reply via email to