This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9538567159 [Feature][Connector-V2]  jdbc connector supports Kingbase 
database (#4803)
9538567159 is described below

commit 9538567159c260d05242abc850453d0a44acf13f
Author: Nick Young <[email protected]>
AuthorDate: Wed Sep 13 20:04:32 2023 +0800

    [Feature][Connector-V2]  jdbc connector supports Kingbase database (#4803)
---
 docs/en/connector-v2/sink/Jdbc.md                  |   1 +
 docs/en/connector-v2/sink/Kingbase.md              | 168 ++++++++++++++++
 docs/en/connector-v2/source/Jdbc.md                |   1 +
 docs/en/connector-v2/source/Kingbase.md            | 148 ++++++++++++++
 release-note.md                                    |   1 +
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |  13 ++
 .../internal/dialect/kingbase/KingbaseDialect.java |  68 +++++++
 .../dialect/kingbase/KingbaseDialectFactory.java   |  38 ++++
 .../dialect/kingbase/KingbaseJdbcRowConverter.java | 187 +++++++++++++++++
 .../dialect/kingbase/KingbaseTypeMapper.java       | 138 +++++++++++++
 .../connectors/seatunnel/jdbc/JdbcKingbaseIT.java  | 223 +++++++++++++++++++++
 .../resources/jdbc_kingbase_source_and_sink.conf   |  43 ++++
 12 files changed, 1029 insertions(+)

diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index f845dea6ef..394fadde80 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -176,6 +176,7 @@ there are some reference value for params above.
 | Redshift   | com.amazon.redshift.jdbc42.Driver            | 
jdbc:redshift://localhost:5439/testdb                              | 
com.amazon.redshift.xa.RedshiftXADataSource        | 
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42          
                            |
 | Snowflake  | net.snowflake.client.jdbc.SnowflakeDriver    | 
jdbc:snowflake://<account_name>.snowflakecomputing.com             | /          
                                        | 
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc                 
                            |
 | Vertica    | com.vertica.jdbc.Driver                      | 
jdbc:vertica://localhost:5433                                      | /          
                                        | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
             |
+| Kingbase   | com.kingbase8.Driver                         | 
jdbc:kingbase8://localhost:54321/db_test                           | /          
                                        | 
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
                          |
 | OceanBase  | com.oceanbase.jdbc.Driver                    | 
jdbc:oceanbase://localhost:2881                                    | /          
                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar
              |
 
 ## Example
diff --git a/docs/en/connector-v2/sink/Kingbase.md 
b/docs/en/connector-v2/sink/Kingbase.md
new file mode 100644
index 0000000000..b92b12fc42
--- /dev/null
+++ b/docs/en/connector-v2/sink/Kingbase.md
@@ -0,0 +1,168 @@
+# Kingbase
+
+> JDBC Kingbase Sink Connector
+
+## Support Connector Version
+
+- 8.6
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Key Features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [ ] [cdc](../../concept/connector-v2-features.md)
+
+## Description
+
+> Use `Xa transactions` to ensure `exactly-once`. So only support 
`exactly-once` for the database which is
+> support `Xa transactions`. You can set `is_exactly_once=true` to enable 
it.Kingbase currently does not support
+
+## Supported DataSource Info
+
+| Datasource | Supported versions |        Driver        |                   
Url                    |                                             Maven      
                                        |
+|------------|--------------------|----------------------|------------------------------------------|------------------------------------------------------------------------------------------------|
+| Kingbase   | 8.6                | com.kingbase8.Driver | 
jdbc:kingbase8://localhost:54321/db_test | 
[Download](https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar)
 |
+
+## Database Dependency
+
+> Please download the support list corresponding to 'Maven' and copy it to the 
'$SEATNUNNEL_HOME/plugins/jdbc/lib/'
+> working directory<br/>
+> For example: cp kingbase8-8.6.0.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+|              Kingbase Data type              |                               
                                 SeaTunnel Data type                            
                                    |
+|----------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------|
+| BOOL                                         | BOOLEAN                       
                                                                                
                                    |
+| INT2                                         | SHORT                         
                                                                                
                                    |
+| SMALLSERIAL <br/>SERIAL <br/>INT4            | INT                           
                                                                                
                                    |
+| INT8 <br/>BIGSERIAL                          | BIGINT                        
                                                                                
                                    |
+| FLOAT4                                       | FLOAT                         
                                                                                
                                    |
+| FLOAT8                                       | DOUBLE                        
                                                                                
                                    |
+| NUMERIC                                      | DECIMAL((Get the designated 
column's specified column size),<br/>(Gets the designated column's number of 
digits to right of the decimal point.))) |
+| BPCHAR <br/>CHARACTER <br/>VARCHAR <br/>TEXT | STRING                        
                                                                                
                                    |
+| TIMESTAMP                                    | LOCALDATETIME                 
                                                                                
                                    |
+| TIME                                         | LOCALTIME                     
                                                                                
                                    |
+| DATE                                         | LOCALDATE                     
                                                                                
                                    |
+| Other data type                              | Not supported yet             
                                                                                
                                    |
+
+## Sink Options
+
+|                   Name                    |  Type   | Required | Default |   
                                                                                
                              Description                                       
                                                                           |
+|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url                                       | String  | Yes      | -       | 
The URL of the JDBC connection. Refer to a case: 
jdbc:db2://127.0.0.1:50000/dbname                                               
                                                                                
                            |
+| driver                                    | String  | Yes      | -       | 
The jdbc class name used to connect to the remote data source,<br/> if you use 
DB2 the value is `com.ibm.db2.jdbc.app.DB2Driver`.                              
                                                                              |
+| user                                      | String  | No       | -       | 
Connection instance user name                                                   
                                                                                
                                                                             |
+| password                                  | String  | No       | -       | 
Connection instance password                                                    
                                                                                
                                                                             |
+| query                                     | String  | No       | -       | 
Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` 
have the higher priority                                                        
                                                                               |
+| database                                  | String  | No       | -       | 
Use this `database` and `table-name` auto-generate sql and receive upstream 
input datas write to database.<br/>This option is mutually exclusive with 
`query` and has a higher priority.                                              
       |
+| table                                     | String  | No       | -       | 
Use database and this table-name auto-generate sql and receive upstream input 
datas write to database.<br/>This option is mutually exclusive with `query` and 
has a higher priority.                                                         |
+| primary_keys                              | Array   | No       | -       | 
This option is used to support operations such as `insert`, `delete`, and 
`update` when automatically generate sql.                                       
                                                                                
   |
+| support_upsert_by_query_primary_key_exist | Boolean | No       | false   | 
Choose to use INSERT sql, UPDATE sql to process update events(INSERT, 
UPDATE_AFTER) based on query primary key exists. This configuration is only 
used when database unsupport upsert syntax. **Note**: that this method has low 
performance |
+| connection_check_timeout_sec              | Int     | No       | 30      | 
The time in seconds to wait for the database operation used to validate the 
connection to complete.                                                         
                                                                                
 |
+| max_retries                               | Int     | No       | 0       | 
The number of retries to submit failed (executeBatch)                           
                                                                                
                                                                             |
+| batch_size                                | Int     | No       | 1000    | 
For batch writing, when the number of buffered records reaches the number of 
`batch_size` or the time reaches `checkpoint.interval`<br/>, the data will be 
flushed into the database                                                       
  |
+| is_exactly_once                           | Boolean | No       | false   | 
Whether to enable exactly-once semantics, which will use Xa transactions. If 
on, you need to<br/>set `xa_data_source_class_name`. Kingbase currently does 
not support                                                                     
   |
+| generate_sink_sql                         | Boolean | No       | false   | 
Generate sql statements based on the database table you want to write to        
                                                                                
                                                                             |
+| xa_data_source_class_name                 | String  | No       | -       | 
The xa data source class name of the database Driver,Kingbase currently does 
not support                                                                     
                                                                                
|
+| max_commit_attempts                       | Int     | No       | 3       | 
The number of retries for transaction commit failures                           
                                                                                
                                                                             |
+| transaction_timeout_sec                   | Int     | No       | -1      | 
The timeout after the transaction is opened, the default is -1 (never timeout). 
Note that setting the timeout may affect<br/>exactly-once semantics             
                                                                             |
+| auto_commit                               | Boolean | No       | true    | 
Automatic transaction commit is enabled by default                              
                                                                                
                                                                             |
+| common-options                            |         | no       | -       | 
Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details                                         
                                                                                
                 |
+
+### Tips
+
+> If partition_column is not set, it will run in single concurrency, and if 
partition_column is set, it will be executed
+> in parallel according to the concurrency of tasks.
+
+## Task Example
+
+### Simple:
+
+> This example defines a SeaTunnel synchronization task that automatically 
generates data through FakeSource and sends
+> it to JDBC Sink. FakeSource generates a total of 16 rows of data 
(row.num=16), with each row having 12 fields. The final target table is 
test_table will also be 16 rows of data in the table.
+> Before
+> run this job, you need create database test and table test_table in your 
Kingbase. And if you have not yet installed and
+> deployed SeaTunnel, you need to follow the instructions in [Install 
SeaTunnel](../../start-v2/locally/deployment.md)
+> to
+> install and deploy SeaTunnel. And then follow the instructions
+> in [Quick Start With SeaTunnel 
Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job.
+
+```
+# Defining the runtime environment
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    parallelism = 1
+    result_table_name = "fake"
+    row.num = 16
+    schema = {
+      fields {
+            c_string = string
+            c_boolean = boolean
+            c_tinyint = tinyint
+            c_smallint = smallint
+            c_int = int
+            c_bigint = bigint
+            c_float = float
+            c_double = double
+            c_decimal = "decimal(30, 8)"
+            c_date = date
+            c_time = time 
+            c_timestamp = timestamp
+      }
+    }
+  }
+  # If you would like to get more information about how to configure seatunnel 
and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+transform {
+  # If you would like to get more information about how to configure seatunnel 
and see full list of transform plugins,
+    # please go to https://seatunnel.apache.org/docs/category/transform-v2
+}
+
+sink {
+    jdbc {
+        url = "jdbc:kingbase8://127.0.0.1:54321/dbname"
+        driver = "com.kingbase8.Driver"
+        user = "root"
+        password = "123456"
+        query = "insert into 
test_table(c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_decimal,c_date,c_time,c_timestamp)
 values(?,?,?,?,?,?,?,?,?,?,?,?)"
+        }
+  # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
+```
+
+### Generate Sink SQL
+
+> This example not need to write complex sql statements, you can configure the 
database name table name to automatically
+> generate add statements for you
+
+```
+sink {
+    jdbc {
+        url = "jdbc:kingbase8://127.0.0.1:54321/dbname"
+        driver = "com.kingbase8.Driver"
+        user = "root"
+        password = "123456"
+        # Automatically generate sql statements based on database table names
+        generate_sink_sql = true
+        database = test
+        table = test_table
+    }
+}
+```
+
diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index 585c2bc002..b86a7b3385 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -125,6 +125,7 @@ there are some reference value for params above.
 | Snowflake  | net.snowflake.client.jdbc.SnowflakeDriver           | 
jdbc:snowflake://<account_name>.snowflakecomputing.com                 | 
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc                 
                            |
 | Redshift   | com.amazon.redshift.jdbc42.Driver                   | 
jdbc:redshift://localhost:5439/testdb?defaultRowFetchSize=1000         | 
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42          
                            |
 | Vertica    | com.vertica.jdbc.Driver                             | 
jdbc:vertica://localhost:5433                                          | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
             |
+| Kingbase   | com.kingbase8.Driver                                | 
jdbc:kingbase8://localhost:54321/db_test                               | 
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
                          |
 | OceanBase  | com.oceanbase.jdbc.Driver                           | 
jdbc:oceanbase://localhost:2881                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar
              |
 
 ## Example
diff --git a/docs/en/connector-v2/source/Kingbase.md 
b/docs/en/connector-v2/source/Kingbase.md
new file mode 100644
index 0000000000..62e280675d
--- /dev/null
+++ b/docs/en/connector-v2/source/Kingbase.md
@@ -0,0 +1,148 @@
+# Kingbase
+
+> JDBC Kingbase Source Connector
+
+## Support Connector Version
+
+- 8.6
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Description
+
+Read external data source data through JDBC.
+
+## Supported DataSource Info
+
+| Datasource | Supported versions |        Driver        |                   
Url                    |                                             Maven      
                                        |
+|------------|--------------------|----------------------|------------------------------------------|------------------------------------------------------------------------------------------------|
+| Kingbase   | 8.6                | com.kingbase8.Driver | 
jdbc:kingbase8://localhost:54321/db_test | 
[Download](https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar)
 |
+
+## Database Dependency
+
+> Please download the support list corresponding to 'Maven' and copy it to the 
'$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory<br/>
+> For example: cp kingbase8-8.6.0.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+|            Kingbase Data type             |                                  
                              SeaTunnel Data type                               
                                 |
+|-------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------|
+| BOOL                                      | BOOLEAN                          
                                                                                
                                 |
+| INT2                                      | SHORT                            
                                                                                
                                 |
+| SMALLSERIAL <br/>SERIAL <br/>INT4         | INT                              
                                                                                
                                 |
+| INT8 <br/>BIGSERIAL                       | BIGINT                           
                                                                                
                                 |
+| FLOAT4                                    | FLOAT                            
                                                                                
                                 |
+| FLOAT8                                    | DOUBLE                           
                                                                                
                                 |
+| NUMERIC                                   | DECIMAL((Get the designated 
column's specified column size),<br/>(Gets the designated column's number of 
digits to right of the decimal point.))) |
+| BPCHAR<br/>CHARACTER<br/>VARCHAR<br/>TEXT | STRING                           
                                                                                
                                 |
+| TIMESTAMP                                 | LOCALDATETIME                    
                                                                                
                                 |
+| TIME                                      | LOCALTIME                        
                                                                                
                                 |
+| DATE                                      | LOCALDATE                        
                                                                                
                                 |
+| Other data type                           | Not supported yet                
                                                                                
                                 |
+
+## Source Options
+
+|             Name             |    Type    | Required |     Default     |     
                                                                                
                                         Description                            
                                                                                
                  |
+|------------------------------|------------|----------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url                          | String     | Yes      | -               | The 
URL of the JDBC connection. Refer to a case: 
jdbc:kingbase8://localhost:54321/test                                           
                                                                                
                                                     |
+| driver                       | String     | Yes      | -               | The 
jdbc class name used to connect to the remote data source, should be 
`com.kingbase8.Driver`.                                                         
                                                                                
                             |
+| user                         | String     | No       | -               | 
Connection instance user name                                                   
                                                                                
                                                                                
                      |
+| password                     | String     | No       | -               | 
Connection instance password                                                    
                                                                                
                                                                                
                      |
+| query                        | String     | Yes      | -               | 
Query statement                                                                 
                                                                                
                                                                                
                      |
+| connection_check_timeout_sec | Int        | No       | 30              | The 
time in seconds to wait for the database operation used to validate the 
connection to complete                                                          
                                                                                
                          |
+| partition_column             | String     | No       | -               | The 
column name for parallelism's partition, only support numeric type column and 
string type column.                                                             
                                                                                
                    |
+| partition_lower_bound        | BigDecimal | No       | -               | The 
partition_column min value for scan, if not set SeaTunnel will query database 
get min value.                                                                  
                                                                                
                    |
+| partition_upper_bound        | BigDecimal | No       | -               | The 
partition_column max value for scan, if not set SeaTunnel will query database 
get max value.                                                                  
                                                                                
                    |
+| partition_num                | Int        | No       | job parallelism | The 
number of partition count, only support positive integer. Default value is job 
parallelism.                                                                    
                                                                                
                   |
+| fetch_size                   | Int        | No       | 0               | For 
queries that return a large number of objects, you can configure <br/> the row 
fetch size used in the query to improve performance by <br/> reducing the 
number database hits required to satisfy the selection criteria.<br/> Zero 
means use jdbc default value. |
+| common-options               |            | No       | -               | 
Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details                                         
                                                                                
                                      |
+
+### Tips
+
+> If partition_column is not set, it will run in single concurrency, and if 
partition_column is set, it will be executed  in parallel according to the 
concurrency of tasks.
+
+## Task Example
+
+### Simple:
+
+```
+env {
+  execution.parallelism = 2
+  job.mode = "BATCH"
+}
+
+source {
+  Jdbc {
+    driver = "com.kingbase8.Driver"
+    url = "jdbc:kingbase8://localhost:54321/db_test"
+    user = "root"
+    password = ""
+    query = "select * from source"
+  }
+}
+
+transform {
+    # If you would like to get more information about how to configure 
seatunnel and see full list of transform plugins,
+    # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+    Console {}
+}
+```
+
+### Parallel:
+
+> Read your query table in parallel with the shard field you configured and 
the shard data. You can do this if you want to read the whole table
+
+```
+source {
+  Jdbc {
+    driver = "com.kingbase8.Driver"
+    url = "jdbc:kingbase8://localhost:54321/db_test"
+    user = "root"
+    password = ""
+    query = "select * from source"
+    # Parallel sharding reads fields
+    partition_column = "id"
+    # Number of fragments
+    partition_num = 10
+  }
+}
+```
+
+### Parallel Boundary:
+
+> It is more efficient to read your data source according to the upper and 
lower boundaries you configured
+
+```
+source {
+  Jdbc {
+    driver = "com.kingbase8.Driver"
+    url = "jdbc:kingbase8://localhost:54321/db_test"
+    user = "root"
+    password = ""
+    query = "select * from source"
+    partition_column = "id"
+    partition_num = 10
+    # Read start boundary
+    partition_lower_bound = 1
+    # Read end boundary
+    partition_upper_bound = 500
+  }
+}
+```
+
diff --git a/release-note.md b/release-note.md
index 1b797ff315..61664d773f 100644
--- a/release-note.md
+++ b/release-note.md
@@ -159,6 +159,7 @@
 - [Connector-V2] [Paimon] Introduce paimon connector (#4178)
 - [Connector V2] [Cassandra] Expose configurable options in Cassandra (#3681)
 - [Connector V2] [Jdbc] Supports GEOMETRY data type for PostgreSQL (#4673)
+- [Connector V2] [Jdbc] Supports Kingbase database (#4803)
 - [Transform-V2] Add UDF SPI and an example implement for SQL Transform plugin 
(#4392)
 - [Transform-V2] Support copy field list (#4404)
 - [Transform-V2] Add support CatalogTable for FieldMapperTransform (#4423)
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml 
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index e76237e7e0..62d541d19f 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -46,6 +46,7 @@
         <snowflake.version>3.13.29</snowflake.version>
         <vertica.version>12.0.3-0</vertica.version>
         <postgis.jdbc.version>2.5.1</postgis.jdbc.version>
+        <kingbase8.version>8.6.0</kingbase8.version>
     </properties>
 
     <dependencyManagement>
@@ -143,6 +144,12 @@
                 <version>${vertica.version}</version>
                 <scope>provided</scope>
             </dependency>
+            <dependency>
+                <groupId>cn.com.kingbase</groupId>
+                <artifactId>kingbase8</artifactId>
+                <version>${kingbase8.version}</version>
+                <scope>provided</scope>
+            </dependency>
         </dependencies>
     </dependencyManagement>
 
@@ -218,5 +225,11 @@
             <groupId>com.vertica.jdbc</groupId>
             <artifactId>vertica-jdbc</artifactId>
         </dependency>
+
+        <dependency>
+            <groupId>cn.com.kingbase</groupId>
+            <artifactId>kingbase8</artifactId>
+        </dependency>
+
     </dependencies>
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialect.java
new file mode 100644
index 0000000000..2f6d566106
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialect.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class KingbaseDialect implements JdbcDialect {
+
+    @Override
+    public String dialectName() {
+        return "Kingbase";
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new KingbaseJdbcRowConverter();
+    }
+
+    @Override
+    public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
+        return new KingbaseTypeMapper();
+    }
+
+    @Override
+    public Optional<String> getUpsertStatement(
+            String database, String tableName, String[] fieldNames, String[] 
uniqueKeyFields) {
+        String uniqueColumns =
+                Arrays.stream(uniqueKeyFields)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String updateClause =
+                Arrays.stream(fieldNames)
+                        .map(
+                                fieldName ->
+                                        quoteIdentifier(fieldName)
+                                                + "=EXCLUDED."
+                                                + quoteIdentifier(fieldName))
+                        .collect(Collectors.joining(", "));
+        String upsertSQL =
+                String.format(
+                        "%s ON CONFLICT (%s) DO UPDATE SET %s",
+                        getInsertIntoStatement(database, tableName, 
fieldNames),
+                        uniqueColumns,
+                        updateClause);
+        return Optional.of(upsertSQL);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java
new file mode 100644
index 0000000000..f999861035
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+/** Factory for {@link KingbaseDialect}. */
+@AutoService(JdbcDialectFactory.class)
+public class KingbaseDialectFactory implements JdbcDialectFactory {
+
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:kingbase8:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new KingbaseDialect();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
new file mode 100644
index 0000000000..9577e12f62
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Optional;
+
+public class KingbaseJdbcRowConverter extends AbstractJdbcRowConverter {
+
+    @Override
+    public String converterName() {
+        return "KingBase";
+    }
+
+    @Override
+    @SuppressWarnings("checkstyle:Indentation")
+    public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) 
throws SQLException {
+        Object[] fields = new Object[typeInfo.getTotalFields()];
+        for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); 
fieldIndex++) {
+            SeaTunnelDataType<?> seaTunnelDataType = 
typeInfo.getFieldType(fieldIndex);
+            int resultSetIndex = fieldIndex + 1;
+            switch (seaTunnelDataType.getSqlType()) {
+                case STRING:
+                    fields[fieldIndex] = rs.getString(resultSetIndex);
+                    break;
+                case BOOLEAN:
+                    fields[fieldIndex] = rs.getBoolean(resultSetIndex);
+                    break;
+                case TINYINT:
+                    fields[fieldIndex] = rs.getByte(resultSetIndex);
+                    break;
+                case SMALLINT:
+                    fields[fieldIndex] = rs.getShort(resultSetIndex);
+                    break;
+                case INT:
+                    fields[fieldIndex] = rs.getInt(resultSetIndex);
+                    break;
+                case BIGINT:
+                    fields[fieldIndex] = rs.getLong(resultSetIndex);
+                    break;
+                case FLOAT:
+                    fields[fieldIndex] = rs.getFloat(resultSetIndex);
+                    break;
+                case DOUBLE:
+                    fields[fieldIndex] = rs.getDouble(resultSetIndex);
+                    break;
+                case DECIMAL:
+                    fields[fieldIndex] = rs.getBigDecimal(resultSetIndex);
+                    break;
+                case DATE:
+                    Date sqlDate = rs.getDate(resultSetIndex);
+                    fields[fieldIndex] =
+                            
Optional.ofNullable(sqlDate).map(Date::toLocalDate).orElse(null);
+                    break;
+                case TIME:
+                    Time sqlTime = rs.getTime(resultSetIndex);
+                    fields[fieldIndex] =
+                            
Optional.ofNullable(sqlTime).map(Time::toLocalTime).orElse(null);
+                    break;
+                case TIMESTAMP:
+                    Timestamp sqlTimestamp = rs.getTimestamp(resultSetIndex);
+                    fields[fieldIndex] =
+                            Optional.ofNullable(sqlTimestamp)
+                                    .map(Timestamp::toLocalDateTime)
+                                    .orElse(null);
+                    break;
+                case BYTES:
+                    fields[fieldIndex] = rs.getBytes(resultSetIndex);
+                    break;
+                case NULL:
+                    fields[fieldIndex] = null;
+                    break;
+                case ROW:
+                case MAP:
+                case ARRAY:
+                default:
+                    throw new JdbcConnectorException(
+                            CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                            "Unexpected value: " + seaTunnelDataType);
+            }
+        }
+        return new SeaTunnelRow(fields);
+    }
+
+    @Override
+    public PreparedStatement toExternal(
+            SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement 
statement)
+            throws SQLException {
+        for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); 
fieldIndex++) {
+            SeaTunnelDataType<?> seaTunnelDataType = 
rowType.getFieldType(fieldIndex);
+            int statementIndex = fieldIndex + 1;
+            Object fieldValue = row.getField(fieldIndex);
+            if (fieldValue == null) {
+                statement.setObject(statementIndex, null);
+                continue;
+            }
+
+            switch (seaTunnelDataType.getSqlType()) {
+                case STRING:
+                    statement.setString(statementIndex, (String) 
row.getField(fieldIndex));
+                    break;
+                case BOOLEAN:
+                    statement.setBoolean(statementIndex, (Boolean) 
row.getField(fieldIndex));
+                    break;
+                case TINYINT:
+                    statement.setByte(statementIndex, (Byte) 
row.getField(fieldIndex));
+                    break;
+                case SMALLINT:
+                    statement.setShort(statementIndex, (Short) 
row.getField(fieldIndex));
+                    break;
+                case INT:
+                    statement.setInt(statementIndex, (Integer) 
row.getField(fieldIndex));
+                    break;
+                case BIGINT:
+                    statement.setLong(statementIndex, (Long) 
row.getField(fieldIndex));
+                    break;
+                case FLOAT:
+                    statement.setFloat(statementIndex, (Float) 
row.getField(fieldIndex));
+                    break;
+                case DOUBLE:
+                    statement.setDouble(statementIndex, (Double) 
row.getField(fieldIndex));
+                    break;
+                case DECIMAL:
+                    statement.setBigDecimal(statementIndex, (BigDecimal) 
row.getField(fieldIndex));
+                    break;
+                case DATE:
+                    LocalDate localDate = (LocalDate) row.getField(fieldIndex);
+                    statement.setDate(statementIndex, 
java.sql.Date.valueOf(localDate));
+                    break;
+                case TIME:
+                    LocalTime localTime = (LocalTime) row.getField(fieldIndex);
+                    statement.setTime(statementIndex, 
java.sql.Time.valueOf(localTime));
+                    break;
+                case TIMESTAMP:
+                    LocalDateTime localDateTime = (LocalDateTime) 
row.getField(fieldIndex);
+                    statement.setTimestamp(
+                            statementIndex, 
java.sql.Timestamp.valueOf(localDateTime));
+                    break;
+                case BYTES:
+                    statement.setBytes(statementIndex, (byte[]) 
row.getField(fieldIndex));
+                    break;
+                case NULL:
+                    statement.setNull(statementIndex, java.sql.Types.NULL);
+                    break;
+                case ROW:
+                case MAP:
+                case ARRAY:
+                default:
+                    throw new JdbcConnectorException(
+                            CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                            "Unexpected value: " + seaTunnelDataType);
+            }
+        }
+        return statement;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeMapper.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeMapper.java
new file mode 100644
index 0000000000..439c8fc420
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeMapper.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+public class KingbaseTypeMapper implements JdbcDialectTypeMapper {
+
+    private static final String KB_SMALLSERIAL = "SMALLSERIAL";
+    private static final String KB_SERIAL = "SERIAL";
+    private static final String KB_BIGSERIAL = "BIGSERIAL";
+    private static final String KB_BYTEA = "BYTEA";
+    private static final String KB_BYTEA_ARRAY = "_BYTEA";
+    private static final String KB_SMALLINT = "INT2";
+    private static final String KB_SMALLINT_ARRAY = "_INT2";
+    private static final String KB_INTEGER = "INT4";
+    private static final String KB_INTEGER_ARRAY = "_INT4";
+    private static final String KB_BIGINT = "INT8";
+    private static final String KB_BIGINT_ARRAY = "_INT8";
+    private static final String KB_REAL = "FLOAT4";
+    private static final String KB_REAL_ARRAY = "_FLOAT4";
+    private static final String KB_DOUBLE_PRECISION = "FLOAT8";
+    private static final String KB_DOUBLE_PRECISION_ARRAY = "_FLOAT8";
+    private static final String KB_NUMERIC = "NUMERIC";
+    private static final String KB_NUMERIC_ARRAY = "_NUMERIC";
+    private static final String KB_BOOLEAN = "BOOL";
+    private static final String KB_BOOLEAN_ARRAY = "_BOOL";
+    private static final String KB_TIMESTAMP = "TIMESTAMP";
+    private static final String KB_TIMESTAMP_ARRAY = "_TIMESTAMP";
+    private static final String KB_TIMESTAMPTZ = "TIMESTAMPTZ";
+    private static final String KB_TIMESTAMPTZ_ARRAY = "_TIMESTAMPTZ";
+    private static final String KB_DATE = "DATE";
+    private static final String KB_DATE_ARRAY = "_DATE";
+    private static final String KB_TIME = "TIME";
+    private static final String KB_TIME_ARRAY = "_TIME";
+    private static final String KB_TEXT = "TEXT";
+    private static final String KB_TEXT_ARRAY = "_TEXT";
+    private static final String KB_CHAR = "BPCHAR";
+    private static final String KB_CHAR_ARRAY = "_BPCHAR";
+    private static final String KB_CHARACTER = "CHARACTER";
+
+    private static final String KB_CHARACTER_VARYING = "VARCHAR";
+    private static final String KB_CHARACTER_VARYING_ARRAY = "_VARCHAR";
+    private static final String KB_JSON = "JSON";
+    private static final String KB_JSONB = "JSONB";
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    @Override
+    public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int 
colIndex)
+            throws SQLException {
+
+        String kbType = metadata.getColumnTypeName(colIndex).toUpperCase();
+
+        int precision = metadata.getPrecision(colIndex);
+
+        switch (kbType) {
+            case KB_BOOLEAN:
+                return BasicType.BOOLEAN_TYPE;
+            case KB_SMALLINT:
+                return BasicType.SHORT_TYPE;
+            case KB_SMALLSERIAL:
+            case KB_INTEGER:
+            case KB_SERIAL:
+                return BasicType.INT_TYPE;
+            case KB_BIGINT:
+            case KB_BIGSERIAL:
+                return BasicType.LONG_TYPE;
+            case KB_REAL:
+                return BasicType.FLOAT_TYPE;
+            case KB_DOUBLE_PRECISION:
+                return BasicType.DOUBLE_TYPE;
+            case KB_NUMERIC:
+                // see SPARK-26538: handle numeric without explicit precision 
and scale.
+                if (precision > 0) {
+                    return new DecimalType(precision, 
metadata.getScale(colIndex));
+                }
+                return new DecimalType(38, 18);
+            case KB_CHAR:
+            case KB_CHARACTER:
+            case KB_CHARACTER_VARYING:
+            case KB_TEXT:
+                return BasicType.STRING_TYPE;
+            case KB_TIMESTAMP:
+                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+            case KB_TIME:
+                return LocalTimeType.LOCAL_TIME_TYPE;
+            case KB_DATE:
+                return LocalTimeType.LOCAL_DATE_TYPE;
+            case KB_CHAR_ARRAY:
+            case KB_CHARACTER_VARYING_ARRAY:
+            case KB_TEXT_ARRAY:
+            case KB_DOUBLE_PRECISION_ARRAY:
+            case KB_REAL_ARRAY:
+            case KB_BIGINT_ARRAY:
+            case KB_SMALLINT_ARRAY:
+            case KB_INTEGER_ARRAY:
+            case KB_BYTEA_ARRAY:
+            case KB_BOOLEAN_ARRAY:
+            case KB_TIMESTAMP_ARRAY:
+            case KB_NUMERIC_ARRAY:
+            case KB_TIMESTAMPTZ:
+            case KB_TIMESTAMPTZ_ARRAY:
+            case KB_TIME_ARRAY:
+            case KB_DATE_ARRAY:
+            case KB_JSONB:
+            case KB_JSON:
+            case KB_BYTEA:
+            default:
+                throw new JdbcConnectorException(
+                        CommonErrorCode.UNSUPPORTED_OPERATION,
+                        String.format("Doesn't support KingBaseES type '%s' 
yet", kbType));
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcKingbaseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcKingbaseIT.java
new file mode 100644
index 0000000000..17d53bb87d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcKingbaseIT.java
@@ -0,0 +1,223 @@
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.Disabled;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * If you want to run this e2e, you need to download km license from
+ * https://www.kingbase.com.cn/sqwjxz/index.htm and modify the KM_LICENSE_PATH 
variable to the
+ * address where you downloaded the certificate. Also, remove the @Disabled 
annotation. The spark
+ * engine does not support the TIME type.Two environment variables need to be 
added to the spark
+ * container: "LANG"="C.UTF-8", "JAVA_TOOL_OPTIONS"="-Dfile.encoding=UTF8"
+ */
+@Slf4j
+@Disabled("Due to copyright reasons, you need to download the trial version km 
license yourself")
+public class JdbcKingbaseIT extends AbstractJdbcIT {
+    private static final String KINGBASE_IMAGE = "huzhihui/kingbase:v8r6";
+    private static final String KINGBASE_CONTAINER_HOST = "e2e_KINGBASEDb";
+    private static final String KINGBASE_DATABASE = "test";
+    private static final String KINGBASE_SCHEMA = "public";
+    private static final String KINGBASE_SOURCE = "e2e_table_source";
+    private static final String KINGBASE_SINK = "e2e_table_sink";
+
+    private static final String KINGBASE_USERNAME = "SYSTEM";
+    private static final String KINGBASE_PASSWORD = "123456";
+    private static final int KINGBASE_PORT = 54321;
+    private static final String KINGBASE_URL = "jdbc:kingbase8://" + HOST + 
":%s/test";
+    private static final String DRIVER_CLASS = "com.kingbase8.Driver";
+    private static final String KM_LICENSE_PATH = "KM_LICENSE_PATH";
+
+    private static final List<String> CONFIG_FILE =
+            Lists.newArrayList("/jdbc_kingbase_source_and_sink.conf");
+    private static final String CREATE_SQL =
+            "create table %s \n"
+                    + "(\n"
+                    + "    c1  SMALLSERIAL,\n"
+                    + "    c2  SERIAL,\n"
+                    + "    c3  BIGSERIAL,\n"
+                    + "    c5  INT2,\n"
+                    + "    c7  INT4,\n"
+                    + "    c9 INT8,\n"
+                    + "    c11 FLOAT4,\n"
+                    + "    c13 FLOAT8,\n"
+                    + "    c15 NUMERIC,\n"
+                    + "    c16 BOOL,\n"
+                    + "    c18 TIMESTAMP,\n"
+                    + "    c19 DATE,\n"
+                    + "    c20 TIME,\n"
+                    + "    c21 TEXT,\n"
+                    + "    c23 BPCHAR,\n"
+                    + "    c25 CHARACTER,\n"
+                    + "    c26 VARCHAR\n"
+                    + ");\n";
+
+    @Override
+    JdbcCase getJdbcCase() {
+        Map<String, String> containerEnv = new HashMap<>();
+        String jdbcUrl = String.format(KINGBASE_URL, KINGBASE_PORT);
+        Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+        String[] fieldNames = testDataSet.getKey();
+
+        String insertSql = insertTable(KINGBASE_SCHEMA, KINGBASE_SOURCE, 
fieldNames);
+
+        return JdbcCase.builder()
+                .dockerImage(KINGBASE_IMAGE)
+                .networkAliases(KINGBASE_CONTAINER_HOST)
+                .containerEnv(containerEnv)
+                .driverClass(DRIVER_CLASS)
+                .host(HOST)
+                .port(KINGBASE_PORT)
+                .localPort(KINGBASE_PORT)
+                .jdbcTemplate(KINGBASE_URL)
+                .jdbcUrl(jdbcUrl)
+                .userName(KINGBASE_USERNAME)
+                .password(KINGBASE_PASSWORD)
+                .database(KINGBASE_DATABASE)
+                .sourceTable(KINGBASE_SOURCE)
+                .sinkTable(KINGBASE_SINK)
+                .createSql(CREATE_SQL)
+                .configFile(CONFIG_FILE)
+                .insertSql(insertSql)
+                .testData(testDataSet)
+                .build();
+    }
+
+    @Override
+    void compareResult() throws SQLException, IOException {}
+
+    @Override
+    String driverUrl() {
+        return 
"https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar";;
+    }
+
+    @Override
+    Pair<String[], List<SeaTunnelRow>> initTestData() {
+        String[] fieldNames =
+                new String[] {
+                    "c1", "c2", "c3", "c5", "c7", "c9", "c11", "c13", "c15", 
"c16", "c18", "c19",
+                    "c20", "c21", "c23", "c25", "c26"
+                };
+        List<SeaTunnelRow> rows = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            SeaTunnelRow row =
+                    new SeaTunnelRow(
+                            new Object[] {
+                                i,
+                                Long.parseLong(String.valueOf(i)),
+                                Long.parseLong(String.valueOf(i)),
+                                (short) i,
+                                i,
+                                Long.parseLong(String.valueOf(i)),
+                                Float.parseFloat("1.1"),
+                                Double.parseDouble("1.1"),
+                                BigDecimal.valueOf(i, 10),
+                                true,
+                                LocalDateTime.now(),
+                                LocalDate.now(),
+                                LocalTime.now(),
+                                String.valueOf(i),
+                                String.valueOf(i),
+                                String.valueOf(1),
+                                String.valueOf(i)
+                            });
+            rows.add(row);
+        }
+
+        return Pair.of(fieldNames, rows);
+    }
+
+    @Override
+    GenericContainer<?> initContainer() {
+        GenericContainer<?> container =
+                new GenericContainer<>(KINGBASE_IMAGE)
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(KINGBASE_CONTAINER_HOST)
+                        .withEnv("KINGBASE_SYSTEM_PASSWORD", "123456")
+                        .withFileSystemBind(KM_LICENSE_PATH, 
"/home/kingbase/license.dat")
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        
DockerLoggerFactory.getLogger(KINGBASE_IMAGE)));
+        container.setPortBindings(
+                Lists.newArrayList(String.format("%s:%s", KINGBASE_PORT, 
KINGBASE_PORT)));
+        return container;
+    }
+
+    protected void createNeededTables() {
+        try (Statement statement = connection.createStatement()) {
+            String createTemplate = jdbcCase.getCreateSql();
+
+            String createSource =
+                    String.format(
+                            createTemplate, KINGBASE_SCHEMA + "." + 
jdbcCase.getSourceTable());
+            String createSink =
+                    String.format(createTemplate, KINGBASE_SCHEMA + "." + 
jdbcCase.getSinkTable());
+
+            statement.execute(createSource);
+            statement.execute(createSink);
+
+            connection.commit();
+        } catch (Exception exception) {
+            log.error(ExceptionUtils.getMessage(exception));
+            throw new 
SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception);
+        }
+    }
+
+    public String insertTable(String schema, String table, String... fields) {
+        String columns = String.join(", ", fields);
+        String placeholders = Arrays.stream(fields).map(f -> 
"?").collect(Collectors.joining(", "));
+
+        return "INSERT INTO "
+                + schema
+                + "."
+                + table
+                + " ("
+                + columns
+                + " )"
+                + " VALUES ("
+                + placeholders
+                + ")";
+    }
+
+    public void clearTable(String schema, String table) {}
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_kingbase_source_and_sink.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_kingbase_source_and_sink.conf
new file mode 100644
index 0000000000..326fc72724
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_kingbase_source_and_sink.conf
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source{
+    jdbc{
+        driver = "com.kingbase8.Driver"
+        url = "jdbc:kingbase8://e2e_KINGBASEDb:54321/test"
+        user = "SYSTEM"
+        password = "123456"
+        query ="select * from public.e2e_table_source"
+    }
+}
+
+
+sink {
+    jdbc{
+        driver = "com.kingbase8.Driver"
+        url = "jdbc:kingbase8://e2e_KINGBASEDb:54321/test"
+        user = "SYSTEM"
+        password = "123456"
+        query ="INSERT INTO public.e2e_table_sink (c1, c2, c3, c5, c7, c9, 
c11, c13, c15, c16, c18, c19, c20, c21, c23, c25, c26) VALUES (?, ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
+    }
+}
+

Reply via email to