This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 92c62c5e63 [Improve][Connector-V2] Optimize dialect selection in jdbc 
(#8820)
92c62c5e63 is described below

commit 92c62c5e63d673fbf10780c6a81e1d4a415c50fc
Author: corgy-w <[email protected]>
AuthorDate: Wed Apr 2 13:52:29 2025 +0800

    [Improve][Connector-V2] Optimize dialect selection in jdbc (#8820)
---
 docs/en/connector-v2/sink/Jdbc.md                  | 22 ++++-
 docs/en/connector-v2/source/Jdbc.md                | 19 +++++
 docs/zh/connector-v2/sink/Jdbc.md                  | 72 +++++++++++------
 .../jdbc/config/JdbcConnectionConfig.java          | 93 ++++++++--------------
 .../seatunnel/jdbc/config/JdbcOptions.java         |  7 ++
 .../seatunnel/jdbc/internal/JdbcInputFormat.java   |  4 +-
 .../jdbc/internal/dialect/DatabaseIdentifier.java  |  1 +
 .../internal/dialect/GenericDialectFactory.java    |  5 ++
 .../jdbc/internal/dialect/JdbcDialectFactory.java  |  6 ++
 .../jdbc/internal/dialect/JdbcDialectLoader.java   | 24 ++++--
 .../internal/dialect/db2/DB2DialectFactory.java    |  6 ++
 .../internal/dialect/dm/DmdbDialectFactory.java    |  6 ++
 .../dialect/gbase8a/Gbase8aDialectFactory.java     |  6 ++
 .../dialect/greenplum/GreenplumDialectFactory.java |  6 ++
 .../dialect/highgo/HighGoDialectFactory.java       |  6 ++
 .../internal/dialect/hive/HiveDialectFactory.java  |  6 ++
 .../dialect/hive/HiveJdbcConnectionProvider.java   |  8 +-
 .../internal/dialect/iris/IrisDialectFactory.java  |  6 ++
 .../dialect/kingbase/KingbaseDialectFactory.java   |  6 ++
 .../dialect/mysql/MySqlDialectFactory.java         |  5 ++
 .../dialect/oceanbase/OceanBaseDialectFactory.java |  6 ++
 .../dialect/opengauss/OpenGaussDialectFactory.java |  6 ++
 .../dialect/oracle/OracleDialectFactory.java       |  6 ++
 .../dialect/phoenix/PhoenixDialectFactory.java     |  6 ++
 .../dialect/psql/PostgresDialectFactory.java       |  6 ++
 .../dialect/redshift/RedshiftDialectFactory.java   |  6 ++
 .../dialect/saphana/SapHanaDialectFactory.java     |  6 ++
 .../dialect/snowflake/SnowflakeDialectFactory.java |  6 ++
 .../dialect/sqlite/SqliteDialectFactory.java       |  6 ++
 .../dialect/sqlserver/SqlServerDialectFactory.java |  6 ++
 .../tablestore/TablestoreDialectFactory.java       |  6 ++
 .../dialect/teradata/TeradataDialectFactory.java   |  6 ++
 .../dialect/vertica/VerticaDialectFactory.java     |  6 ++
 .../internal/dialect/xugu/XuguDialectFactory.java  |  6 ++
 .../seatunnel/jdbc/sink/JdbcSinkFactory.java       |  5 +-
 .../seatunnel/jdbc/source/ChunkSplitter.java       |  4 +-
 .../seatunnel/jdbc/source/JdbcSourceFactory.java   |  5 +-
 .../seatunnel/jdbc/utils/HiveJdbcUtils.java        |  6 +-
 .../seatunnel/jdbc/utils/JdbcCatalogUtils.java     |  4 +-
 .../internal/dialect/JdbcDialectLoaderTest.java    | 18 ++++-
 40 files changed, 329 insertions(+), 111 deletions(-)

diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index 1508ccd777..5ad6735609 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -31,7 +31,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` 
to enable it.
 
 ## Options
 
-|                   Name                    |  Type   | Required |           
Default            |
+| Name                                      | Type    | Required | Default     
                 |
 
|-------------------------------------------|---------|----------|------------------------------|
 | url                                       | String  | Yes      | -           
                 |
 | driver                                    | String  | Yes      | -           
                 |
@@ -39,6 +39,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` 
to enable it.
 | password                                  | String  | No       | -           
                 |
 | query                                     | String  | No       | -           
                 |
 | compatible_mode                           | String  | No       | -           
                 |
+| dialect                                   | String  | No       | -           
                 | 
 | database                                  | String  | No       | -           
                 |
 | table                                     | String  | No       | -           
                 |
 | primary_keys                              | Array   | No       | -           
                 |
@@ -90,6 +91,23 @@ For example, when using OceanBase database, you need to set 
it to 'mysql' or 'or
 
 Postgres 9.5 version or below,please set it to `postgresLow` to support cdc
 
+### dialect [string]
+
+The appointed dialect, if it does not exist, is still obtained according to 
the url, and the priority is higher than the url. For example,when using 
starrocks, you need set it to `starrocks`. Similarly, when using mysql, you 
need to set its value to `mysql`.
+
+#### dialect list
+
+|           | Dialect Name |          |
+|-----------|--------------|----------|
+| Greenplum | DB2          | Dameng   |
+| Gbase8a   | HIVE         | KingBase |
+| MySQL     | StarRocks    | Oracle   |
+| Phoenix   | Postgres     | Redshift |
+| SapHana   | Snowflake    | Sqlite   |
+| SqlServer | Tablestore   | Teradata |
+| Vertica   | OceanBase    | XUGU     |
+| IRIS      | Inceptor     | Highgo   |
+
 ### database [string]
 
 Use this `database` and `table-name` auto-generate sql and receive upstream 
input datas write to database.
@@ -105,11 +123,13 @@ This option is mutually exclusive with `query` and has a 
higher priority.
 The table parameter can fill in the name of an unwilling table, which will 
eventually be used as the table name of the creation table, and supports 
variables (`${table_name}`, `${schema_name}`). Replacement rules: 
`${schema_name}` will replace the SCHEMA name passed to the target side, and 
`${table_name}` will replace the name of the table passed to the table at the 
target side.
 
 mysql sink for example:
+
 1. test_${schema_name}_${table_name}_test
 2. sink_sinktable
 3. ss_${table_name}
 
 pgsql (Oracle Sqlserver ...) Sink for example:
+
 1. ${schema_name}.${table_name} _test
 2. dbo.tt_${table_name} _sink
 3. public.sink_table
diff --git a/docs/en/connector-v2/source/Jdbc.md 
b/docs/en/connector-v2/source/Jdbc.md
index bc5e39e0a8..ab79a5a45d 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -49,6 +49,7 @@ supports query SQL and can achieve projection effect.
 | password                                   | String  | No       | -          
     | password                                                                 
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | query                                      | String  | No       | -          
     | Query statement                                                          
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | compatible_mode                            | String  | No       | -          
     | The compatible mode of database, required when the database supports 
multiple compatible modes.<br/> For example, when using OceanBase database, you 
need to set it to 'mysql' or 'oracle'. <br/> when using starrocks, you need set 
it to `starrocks`                                                               
                                                                                
                  [...]
+| dialect                                    | String  | No       | -          
     | The appointed dialect, if it does not exist, is still obtained according 
to the url, and the priority is higher than the url. <br/> For example,when 
using starrocks, you need set it to `starrocks`                                 
                                                                                
                                                                                
                  [...]
 | connection_check_timeout_sec               | Int     | No       | 30         
     | The time in seconds to wait for the database operation used to validate 
the connection to complete.                                                     
                                                                                
                                                                                
                                                                                
               [...]
 | partition_column                           | String  | No       | -          
     | The column name for split data.                                          
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | partition_upper_bound                      | Long    | No       | -          
     | The partition_column max value for scan, if not set SeaTunnel will query 
database get max value.                                                         
                                                                                
                                                                                
                                                                                
              [...]
@@ -93,6 +94,24 @@ decimal_type_narrowing = false
 | NUMBER(6, 0)  | Decimal(6, 0)  |
 | NUMBER(10, 0) | Decimal(10, 0) |
 
+### dialect [string]
+
+The appointed dialect, if it does not exist, is still obtained according to 
the url, and the priority is higher than the url. For example,when using 
starrocks, you need set it to `starrocks`. Similarly, when using mysql, you 
need to set its value to `mysql`.
+
+#### dialect list
+
+|           | Dialect Name |          |
+|-----------|--------------|----------|
+| Greenplum | DB2          | Dameng   |
+| Gbase8a   | HIVE         | KingBase |
+| MySQL     | StarRocks    | Oracle   |
+| Phoenix   | Postgres     | Redshift |
+| SapHana   | Snowflake    | Sqlite   |
+| SqlServer | Tablestore   | Teradata |
+| Vertica   | OceanBase    | XUGU     |
+| IRIS      | Inceptor     | Highgo   |
+
+
 ## Parallel Reader
 
 The JDBC Source connector supports parallel reading of data from tables. 
SeaTunnel will use certain rules to split the data in the table, which will be 
handed over to readers for reading. The number of readers is determined by the 
`parallelism` option.
diff --git a/docs/zh/connector-v2/sink/Jdbc.md 
b/docs/zh/connector-v2/sink/Jdbc.md
index bcfd5d5a6d..834c0579e0 100644
--- a/docs/zh/connector-v2/sink/Jdbc.md
+++ b/docs/zh/connector-v2/sink/Jdbc.md
@@ -29,7 +29,7 @@ import ChangeLog from '../changelog/connector-jdbc.md';
 
 ## Options
 
-|                    名称                     |   类型    | 是否必须 |             默认值 
             |
+| 名称                                        | 类型      | 是否必须 | 默认值             
             |
 
|-------------------------------------------|---------|------|------------------------------|
 | url                                       | String  | 是    | -               
             |
 | driver                                    | String  | 是    | -               
             |
@@ -37,6 +37,7 @@ import ChangeLog from '../changelog/connector-jdbc.md';
 | password                                  | String  | 否    | -               
             |
 | query                                     | String  | 否    | -               
             |
 | compatible_mode                           | String  | 否    | -               
             |
+| dialect                                   | String  | 否    | -               
             | 
 | database                                  | String  | 否    | -               
             |
 | table                                     | String  | 否    | -               
             |
 | primary_keys                              | Array   | 否    | -               
             |
@@ -87,6 +88,24 @@ JDBC 连接的 URL。参考案例:`jdbc:postgresql://localhost/test`
 
 Postgres 9.5及以下版本,请设置为 `postgresLow` 来支持 CDC
 
+### dialect [string]
+
+指定的方言,如果不存在,仍然按照url获取,优先级高于url。例如,当使用 starrocks 时,你需要将其值设置为 
starrocks,同理,当使用mysql时,你需要将其值设置为mysql。
+
+#### 示例可选
+
+|           | 方言名称       |          |
+|-----------|------------|----------|
+| Greenplum | DB2        | Dameng   |
+| Gbase8a   | HIVE       | KingBase |
+| MySQL     | StarRocks  | Oracle   |
+| Phoenix   | Postgres   | Redshift |
+| SapHana   | Snowflake  | Sqlite   |
+| SqlServer | Tablestore | Teradata |
+| Vertica   | OceanBase  | XUGU     |
+| IRIS      | Inceptor   | Highgo   |
+
+
 ### database [string]
 
 使用此 `database` 和 `table-name` 自动生成 SQL,并接收上游输入的数据写入数据库。
@@ -122,7 +141,8 @@ Tip: 如果目标数据库有 SCHEMA 的概念,则表参数必须写成 `xxx.x
 
 ### support_upsert_by_query_primary_key_exist [boolean]
 
-根据查询主键是否存在来选择使用 INSERT sql、UPDATE sql 来处理变更事件(INSERT、UPDATE_AFTER)。仅当数据库不支持 
upsert 语法时才使用此配置
+根据查询主键是否存在来选择使用 INSERT sql、UPDATE sql 来处理变更事件(INSERT、UPDATE_AFTER)。仅当数据库不支持 
upsert
+语法时才使用此配置
 **注意**:该方法性能较低
 
 ### connection_check_timeout_sec [int]
@@ -163,7 +183,8 @@ Tip: 如果目标数据库有 SCHEMA 的概念,则表参数必须写成 `xxx.x
 
 ### field_ide [String]
 
-字段 `field_ide` 用于在从 source 同步到 sink 时,确定字段是否需要转换为大写或小写。'ORIGINAL' 
表示不需要转换,'UPPERCASE' 表示转换为大写,'LOWERCASE' 表示转换为小写
+字段 `field_ide` 用于在从 source 同步到 sink 时,确定字段是否需要转换为大写或小写。'ORIGINAL' 
表示不需要转换,'UPPERCASE'
+表示转换为大写,'LOWERCASE' 表示转换为小写
 
 ### properties
 
@@ -218,28 +239,29 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
 
 附录参数仅提供参考
 
-|    数据源     |                    driver                    |                  
              url                                 |             
xa_data_source_class_name              | maven                                  
                                                               |
-|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------|
-| MySQL      | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | 
com.mysql.cj.jdbc.MysqlXADataSource                | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                      |
-| PostgreSQL | org.postgresql.Driver                        | 
jdbc:postgresql://localhost:5432/postgres                          | 
org.postgresql.xa.PGXADataSource                   | 
https://mvnrepository.com/artifact/org.postgresql/postgresql                    
                      |
-| DM         | dm.jdbc.driver.DmDriver                      | 
jdbc:dm://localhost:5236                                           | 
dm.jdbc.driver.DmdbXADataSource                    | 
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                    
                      |
-| Phoenix    | org.apache.phoenix.queryserver.client.Driver | 
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /          
                                        | 
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
                  |
-| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | 
jdbc:sqlserver://localhost:1433                                    | 
com.microsoft.sqlserver.jdbc.SQLServerXADataSource | 
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc           
                      |
-| Oracle     | oracle.jdbc.OracleDriver                     | 
jdbc:oracle:thin:@localhost:1521/xepdb1                            | 
oracle.jdbc.xa.OracleXADataSource                  | 
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8              
                      |
-| sqlite     | org.sqlite.JDBC                              | 
jdbc:sqlite:test.db                                                | /          
                                        | 
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc                       
                      |
-| GBase8a    | com.gbase.jdbc.Driver                        | 
jdbc:gbase://e2e_gbase8aDb:5258/test                               | /          
                                        | 
https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar
    |
-| StarRocks  | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | /          
                                        | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                      |
-| db2        | com.ibm.db2.jcc.DB2Driver                    | 
jdbc:db2://localhost:50000/testdb                                  | 
com.ibm.db2.jcc.DB2XADataSource                    | 
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4               
                      |
-| saphana    | com.sap.db.jdbc.Driver                       | 
jdbc:sap://localhost:39015                                         | /          
                                        | 
https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc                  
                      |
-| Doris      | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | /          
                                        | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                      |
-| teradata   | com.teradata.jdbc.TeraDriver                 | 
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test              | /          
                                        | 
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc                   
                      |
-| Redshift   | com.amazon.redshift.jdbc42.Driver            | 
jdbc:redshift://localhost:5439/testdb                              | 
com.amazon.redshift.xa.RedshiftXADataSource        | 
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42          
                      |
-| Snowflake  | net.snowflake.client.jdbc.SnowflakeDriver    | 
jdbc&#58;snowflake://<account_name>.snowflakecomputing.com         | /          
                                        | 
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc                 
                      |
-| Vertica    | com.vertica.jdbc.Driver                      | 
jdbc:vertica://localhost:5433                                      | /          
                                        | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
       |
-| Kingbase   | com.kingbase8.Driver                         | 
jdbc:kingbase8://localhost:54321/db_test                           | /          
                                        | 
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
                    |
-| OceanBase  | com.oceanbase.jdbc.Driver                    | 
jdbc:oceanbase://localhost:2881                                    | /          
                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
      |
-| opengauss  | org.opengauss.Driver                         | 
jdbc:opengauss://localhost:5432/postgres                           | /          
                                        | 
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
      |
-| Highgo     | com.highgo.jdbc.Driver                       | 
jdbc:highgo://localhost:5866/highgo                                | /          
                                        | 
https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar     
                      |
+| 数据源        | driver                                       | url              
                                                  | xa_data_source_class_name   
                       | maven                                                  
                                            |
+|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|----------------------------------------------------------------------------------------------------|
+| MySQL      | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | 
com.mysql.cj.jdbc.MysqlXADataSource                | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                   |
+| PostgreSQL | org.postgresql.Driver                        | 
jdbc:postgresql://localhost:5432/postgres                          | 
org.postgresql.xa.PGXADataSource                   | 
https://mvnrepository.com/artifact/org.postgresql/postgresql                    
                   |
+| DM         | dm.jdbc.driver.DmDriver                      | 
jdbc:dm://localhost:5236                                           | 
dm.jdbc.driver.DmdbXADataSource                    | 
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                    
                   |
+| Phoenix    | org.apache.phoenix.queryserver.client.Driver | 
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /          
                                        | 
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
               |
+| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | 
jdbc:sqlserver://localhost:1433                                    | 
com.microsoft.sqlserver.jdbc.SQLServerXADataSource | 
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc           
                   |
+| Oracle     | oracle.jdbc.OracleDriver                     | 
jdbc:oracle:thin:@localhost:1521/xepdb1                            | 
oracle.jdbc.xa.OracleXADataSource                  | 
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8              
                   |
+| sqlite     | org.sqlite.JDBC                              | 
jdbc:sqlite:test.db                                                | /          
                                        | 
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc                       
                   |
+| GBase8a    | com.gbase.jdbc.Driver                        | 
jdbc:gbase://e2e_gbase8aDb:5258/test                               | /          
                                        | 
https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar
 |
+| StarRocks  | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | /          
                                        | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                   |
+| db2        | com.ibm.db2.jcc.DB2Driver                    | 
jdbc:db2://localhost:50000/testdb                                  | 
com.ibm.db2.jcc.DB2XADataSource                    | 
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4               
                   |
+| saphana    | com.sap.db.jdbc.Driver                       | 
jdbc:sap://localhost:39015                                         | /          
                                        | 
https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc                  
                   |
+| Doris      | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | /          
                                        | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                   |
+| teradata   | com.teradata.jdbc.TeraDriver                 | 
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test              | /          
                                        | 
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc                   
                   |
+| Redshift   | com.amazon.redshift.jdbc42.Driver            | 
jdbc:redshift://localhost:5439/testdb                              | 
com.amazon.redshift.xa.RedshiftXADataSource        | 
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42          
                   |
+| Snowflake  | net.snowflake.client.jdbc.SnowflakeDriver    | 
jdbc&#58;snowflake://<account_name>.snowflakecomputing.com         | /          
                                        | 
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc                 
                   |
+| Vertica    | com.vertica.jdbc.Driver                      | 
jdbc:vertica://localhost:5433                                      | /          
                                        | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
    |
+| Kingbase   | com.kingbase8.Driver                         | 
jdbc:kingbase8://localhost:54321/db_test                           | /          
                                        | 
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
                 |
+| OceanBase  | com.oceanbase.jdbc.Driver                    | 
jdbc:oceanbase://localhost:2881                                    | /          
                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
   |
+| opengauss  | org.opengauss.Driver                         | 
jdbc:opengauss://localhost:5432/postgres                           | /          
                                        | 
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
   |
+| Highgo     | com.highgo.jdbc.Driver                       | 
jdbc:highgo://localhost:5866/highgo                                | /          
                                        | 
https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar     
                   |
+
 ## 示例
 
 简单示例
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
index 053ab71a41..9e45646f09 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
@@ -19,43 +19,48 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.config;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 
+import lombok.Getter;
+
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
+@Getter
 public class JdbcConnectionConfig implements Serializable {
     private static final long serialVersionUID = 2L;
 
-    public String url;
-    public String driverName;
-    public String compatibleMode;
-    public int connectionCheckTimeoutSeconds =
+    private String url;
+    private String driverName;
+    private String compatibleMode;
+    private int connectionCheckTimeoutSeconds =
             JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue();
-    public int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue();
-    public String username;
-    public String password;
-    public String query;
+    private int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue();
+    private String username;
+    private String password;
+    private String query;
+
+    private boolean autoCommit = JdbcOptions.AUTO_COMMIT.defaultValue();
 
-    public boolean autoCommit = JdbcOptions.AUTO_COMMIT.defaultValue();
+    private int batchSize = JdbcOptions.BATCH_SIZE.defaultValue();
 
-    public int batchSize = JdbcOptions.BATCH_SIZE.defaultValue();
+    private String xaDataSourceClassName;
 
-    public String xaDataSourceClassName;
+    private boolean decimalTypeNarrowing = 
JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue();
 
-    public boolean decimalTypeNarrowing = 
JdbcOptions.DECIMAL_TYPE_NARROWING.defaultValue();
+    private int maxCommitAttempts = 
JdbcOptions.MAX_COMMIT_ATTEMPTS.defaultValue();
 
-    public int maxCommitAttempts = 
JdbcOptions.MAX_COMMIT_ATTEMPTS.defaultValue();
+    private int transactionTimeoutSec = 
JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
 
-    public int transactionTimeoutSec = 
JdbcOptions.TRANSACTION_TIMEOUT_SEC.defaultValue();
+    private boolean useKerberos = JdbcOptions.USE_KERBEROS.defaultValue();
 
-    public boolean useKerberos = JdbcOptions.USE_KERBEROS.defaultValue();
+    private String kerberosPrincipal;
 
-    public String kerberosPrincipal;
+    private String kerberosKeytabPath;
 
-    public String kerberosKeytabPath;
+    private String krb5Path = JdbcOptions.KRB5_PATH.defaultValue();
 
-    public String krb5Path = JdbcOptions.KRB5_PATH.defaultValue();
+    private String dialect = JdbcOptions.DIALECT.defaultValue();
 
     private Map<String, String> properties;
 
@@ -85,37 +90,10 @@ public class JdbcConnectionConfig implements Serializable {
         
config.getOptional(JdbcOptions.PROPERTIES).ifPresent(builder::properties);
         config.getOptional(JdbcOptions.DECIMAL_TYPE_NARROWING)
                 .ifPresent(builder::decimalTypeNarrowing);
+        config.getOptional(JdbcOptions.DIALECT).ifPresent(builder::dialect);
         return builder.build();
     }
 
-    public String getUrl() {
-        return url;
-    }
-
-    public String getDriverName() {
-        return driverName;
-    }
-
-    public String getCompatibleMode() {
-        return compatibleMode;
-    }
-
-    public boolean isAutoCommit() {
-        return autoCommit;
-    }
-
-    public int getConnectionCheckTimeoutSeconds() {
-        return connectionCheckTimeoutSeconds;
-    }
-
-    public int getMaxRetries() {
-        return maxRetries;
-    }
-
-    public boolean isDecimalTypeNarrowing() {
-        return decimalTypeNarrowing;
-    }
-
     public Optional<String> getUsername() {
         return Optional.ofNullable(username);
     }
@@ -124,26 +102,10 @@ public class JdbcConnectionConfig implements Serializable 
{
         return Optional.ofNullable(password);
     }
 
-    public int getBatchSize() {
-        return batchSize;
-    }
-
-    public String getXaDataSourceClassName() {
-        return xaDataSourceClassName;
-    }
-
-    public int getMaxCommitAttempts() {
-        return maxCommitAttempts;
-    }
-
     public Optional<Integer> getTransactionTimeoutSec() {
         return transactionTimeoutSec < 0 ? Optional.empty() : 
Optional.of(transactionTimeoutSec);
     }
 
-    public Map<String, String> getProperties() {
-        return properties;
-    }
-
     public static JdbcConnectionConfig.Builder builder() {
         return new JdbcConnectionConfig.Builder();
     }
@@ -169,6 +131,7 @@ public class JdbcConnectionConfig implements Serializable {
         public String kerberosPrincipal;
         public String kerberosKeytabPath;
         public String krb5Path = JdbcOptions.KRB5_PATH.defaultValue();
+        public String dialect = JdbcOptions.DIALECT.defaultValue();
 
         private Builder() {}
 
@@ -262,6 +225,11 @@ public class JdbcConnectionConfig implements Serializable {
             return this;
         }
 
+        public Builder dialect(String dialect) {
+            this.dialect = dialect;
+            return this;
+        }
+
         public Builder properties(Map<String, String> properties) {
             this.properties = properties;
             return this;
@@ -286,6 +254,7 @@ public class JdbcConnectionConfig implements Serializable {
             jdbcConnectionConfig.kerberosPrincipal = this.kerberosPrincipal;
             jdbcConnectionConfig.kerberosKeytabPath = this.kerberosKeytabPath;
             jdbcConnectionConfig.krb5Path = this.krb5Path;
+            jdbcConnectionConfig.dialect = this.dialect;
             jdbcConnectionConfig.properties =
                     this.properties == null ? new HashMap<>() : 
this.properties;
             return jdbcConnectionConfig;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index ce1ac866cb..319f4cecf1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -164,6 +164,13 @@ public interface JdbcOptions {
                     .defaultValue(false)
                     .withDescription("support copy in statement (postgresql)");
 
+    Option<String> DIALECT =
+            Options.key("dialect")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The appointed dialect, if it does not exist, is 
still obtained according to the url");
+
     /** source config */
     Option<String> PARTITION_COLUMN =
             Options.key("partition_column")
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
index 8588ef16df..a6e12ad6bf 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java
@@ -67,7 +67,9 @@ public class JdbcInputFormat implements Serializable {
     public JdbcInputFormat(JdbcSourceConfig config, Map<TablePath, 
CatalogTable> tables) {
         this.jdbcDialect =
                 JdbcDialectLoader.load(
-                        config.getJdbcConnectionConfig().getUrl(), 
config.getCompatibleMode());
+                        config.getJdbcConnectionConfig().getUrl(),
+                        config.getJdbcConnectionConfig().getDialect(),
+                        config.getCompatibleMode());
         this.chunkSplitter = ChunkSplitter.create(config);
         this.jdbcRowConverter = jdbcDialect.getRowConverter();
         this.tables = tables;
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
index 0f5bf95010..cf503be4d1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
@@ -45,4 +45,5 @@ public class DatabaseIdentifier {
     public static final String INCEPTOR = "Inceptor";
     public static final String OPENGAUSS = "OpenGauss";
     public static final String HIGHGO = "Highgo";
+    public static final String GREENPLUM = "Greenplum";
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialectFactory.java
index f0d32988bf..7911d1e23a 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/GenericDialectFactory.java
@@ -25,6 +25,11 @@ import javax.annotation.Nonnull;
 @AutoService(JdbcDialectFactory.class)
 public class GenericDialectFactory implements JdbcDialectFactory {
 
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.GENERIC;
+    }
+
     // GenericDialect does not have any special requirements.
     @Override
     public boolean acceptsURL(String url) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
index 5439937f53..93ddb78248 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
@@ -24,6 +24,12 @@ package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
  */
 public interface JdbcDialectFactory {
 
+    /**
+     * Retrieves the name of the dialect.
+     *
+     * @return the name of the dialect
+     */
+    String dialectFactoryName();
     /**
      * Retrieves whether the dialect thinks that it can open a connection to 
the given URL.
      * Typically, dialects will return <code>true</code> if they understand 
the sub-protocol
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
index 7dc71835ae..7068e84f21 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
@@ -36,8 +36,8 @@ public final class JdbcDialectLoader {
 
     private JdbcDialectLoader() {}
 
-    public static JdbcDialect load(String url, String compatibleMode) {
-        return load(url, compatibleMode, "");
+    public static JdbcDialect load(String url, String dialect, String 
compatibleMode) {
+        return load(url, compatibleMode, dialect, "");
     }
 
     /**
@@ -45,11 +45,12 @@ public final class JdbcDialectLoader {
      *
      * @param url A database URL.
      * @param compatibleMode The compatible mode.
+     * @return The loaded dialect.
      * @throws IllegalStateException if the loader cannot find exactly one 
dialect that can
      *     unambiguously process the given database URL.
-     * @return The loaded dialect.
      */
-    public static JdbcDialect load(String url, String compatibleMode, String 
fieldIde) {
+    public static JdbcDialect load(
+            String url, String compatibleMode, String dialect, String 
fieldIde) {
         ClassLoader cl = Thread.currentThread().getContextClassLoader();
         List<JdbcDialectFactory> foundFactories = discoverFactories(cl);
 
@@ -60,9 +61,18 @@ public final class JdbcDialectLoader {
                             "Could not find any jdbc dialect factories that 
implement '%s' in the classpath.",
                             JdbcDialectFactory.class.getName()));
         }
-
-        List<JdbcDialectFactory> matchingFactories =
-                foundFactories.stream().filter(f -> 
f.acceptsURL(url)).collect(Collectors.toList());
+        List<JdbcDialectFactory> matchingFactories;
+        if (dialect != null) {
+            matchingFactories =
+                    foundFactories.stream()
+                            .filter(f -> 
f.dialectFactoryName().equalsIgnoreCase(dialect))
+                            .collect(Collectors.toList());
+        } else {
+            matchingFactories =
+                    foundFactories.stream()
+                            .filter(f -> f.acceptsURL(url))
+                            .collect(Collectors.toList());
+        }
 
         // filter out generic dialect factory
         if (matchingFactories.size() > 1) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2DialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2DialectFactory.java
index bbc7536f8c..99b66f17b8 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2DialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/db2/DB2DialectFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.db2;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -26,6 +27,11 @@ import com.google.auto.service.AutoService;
 @AutoService(JdbcDialectFactory.class)
 public class DB2DialectFactory implements JdbcDialectFactory {
 
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.DB_2;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:db2:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialectFactory.java
index 3d29f2f3e5..80f29cef45 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dm/DmdbDialectFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum;
@@ -27,6 +28,11 @@ import com.google.auto.service.AutoService;
 @AutoService(JdbcDialectFactory.class)
 public class DmdbDialectFactory implements JdbcDialectFactory {
 
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.DAMENG;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:dm:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialectFactory.java
index 31787f4974..7c8f99cf26 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialectFactory.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -24,6 +25,11 @@ import com.google.auto.service.AutoService;
 
 @AutoService(JdbcDialectFactory.class)
 public class Gbase8aDialectFactory implements JdbcDialectFactory {
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.GBASE_8A;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:gbase:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/greenplum/GreenplumDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/greenplum/GreenplumDialectFactory.java
index fb4ca3865f..6fa570de76 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/greenplum/GreenplumDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/greenplum/GreenplumDialectFactory.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.greenplum;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect;
@@ -27,6 +28,11 @@ import lombok.NonNull;
 @AutoService(JdbcDialectFactory.class)
 public class GreenplumDialectFactory implements JdbcDialectFactory {
 
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.GREENPLUM;
+    }
+
     @Override
     public boolean acceptsURL(@NonNull String url) {
         // Support greenplum native driver: com.pivotal.jdbc.GreenplumDriver
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/highgo/HighGoDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/highgo/HighGoDialectFactory.java
index 6f3376cefa..aac82185e1 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/highgo/HighGoDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/highgo/HighGoDialectFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.highgo;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialectFactory;
 
@@ -23,6 +24,11 @@ import com.google.auto.service.AutoService;
 
 @AutoService(JdbcDialectFactory.class)
 public class HighGoDialectFactory extends PostgresDialectFactory {
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.HIGHGO;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:highgo:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactory.java
index 3ddf3bfab8..c8af29489b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialectFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.inceptor.InceptorDialect;
@@ -27,6 +28,11 @@ import com.google.auto.service.AutoService;
 @AutoService(JdbcDialectFactory.class)
 public class HiveDialectFactory implements JdbcDialectFactory {
 
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.HIVE;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:hive2:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcConnectionProvider.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcConnectionProvider.java
index 57b591c4c9..152a79a1ce 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcConnectionProvider.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveJdbcConnectionProvider.java
@@ -48,7 +48,7 @@ public class HiveJdbcConnectionProvider extends 
SimpleJdbcConnectionProvider {
         HiveConnectionProduceFunction hiveConnectionProduceFunction =
                 new HiveConnectionProduceFunction(driver, jdbcConfig);
 
-        if (jdbcConfig.useKerberos) {
+        if (jdbcConfig.isUseKerberos()) {
             
super.setConnection(getConnectionWithKerberos(hiveConnectionProduceFunction));
         } else {
             super.setConnection(hiveConnectionProduceFunction.produce());
@@ -70,9 +70,9 @@ public class HiveJdbcConnectionProvider extends 
SimpleJdbcConnectionProvider {
             configuration.set("hadoop.security.authentication", "kerberos");
             return HadoopLoginFactory.loginWithKerberos(
                     configuration,
-                    jdbcConfig.krb5Path,
-                    jdbcConfig.kerberosPrincipal,
-                    jdbcConfig.kerberosKeytabPath,
+                    jdbcConfig.getKrb5Path(),
+                    jdbcConfig.getKerberosPrincipal(),
+                    jdbcConfig.getKerberosKeytabPath(),
                     (conf, userGroupInformation) -> 
hiveConnectionProduceFunction.produce());
         } catch (Exception ex) {
             throw new JdbcConnectorException(KERBEROS_AUTHENTICATION_FAILED, 
ex);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialectFactory.java
index c1e45c9666..98e01b88ed 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialectFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.iris;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -28,6 +29,11 @@ import javax.annotation.Nonnull;
 @AutoService(JdbcDialectFactory.class)
 public class IrisDialectFactory implements JdbcDialectFactory {
 
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.IRIS;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:IRIS:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java
index f999861035..ad6c7d311b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -26,6 +27,11 @@ import com.google.auto.service.AutoService;
 @AutoService(JdbcDialectFactory.class)
 public class KingbaseDialectFactory implements JdbcDialectFactory {
 
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.KINGBASE;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:kingbase8:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
index f8278a60cc..eb0a0b7619 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlDialectFactory.java
@@ -29,6 +29,11 @@ import javax.annotation.Nonnull;
 /** Factory for {@link MysqlDialect}. */
 @AutoService(JdbcDialectFactory.class)
 public class MySqlDialectFactory implements JdbcDialectFactory {
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.MYSQL;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:mysql:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
index d25d48b4f2..700015bdb6 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
@@ -27,6 +28,11 @@ import javax.annotation.Nonnull;
 
 @AutoService(JdbcDialectFactory.class)
 public class OceanBaseDialectFactory implements JdbcDialectFactory {
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.OCENABASE;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:oceanbase:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/opengauss/OpenGaussDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/opengauss/OpenGaussDialectFactory.java
index 899e2a4768..bbdc6b7418 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/opengauss/OpenGaussDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/opengauss/OpenGaussDialectFactory.java
@@ -16,6 +16,7 @@
  */
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.opengauss;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialectFactory;
@@ -27,6 +28,11 @@ import javax.annotation.Nonnull;
 @AutoService(JdbcDialectFactory.class)
 public class OpenGaussDialectFactory extends PostgresDialectFactory {
 
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.OPENGAUSS;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:opengauss:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java
index 121098c461..54958894da 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oracle/OracleDialectFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -27,6 +28,11 @@ import javax.annotation.Nonnull;
 /** Factory for {@link OracleDialect}. */
 @AutoService(JdbcDialectFactory.class)
 public class OracleDialectFactory implements JdbcDialectFactory {
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.ORACLE;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:oracle:thin:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialectFactory.java
index 33851af147..ff0e7b15b8 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/phoenix/PhoenixDialectFactory.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -26,6 +27,11 @@ import lombok.NonNull;
 @AutoService(JdbcDialectFactory.class)
 public class PhoenixDialectFactory implements JdbcDialectFactory {
 
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.PHOENIX;
+    }
+
     @Override
     public boolean acceptsURL(@NonNull String url) {
         return url.startsWith("jdbc:phoenix:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
index 59dc0b45c6..8ec87b6737 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialectFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psqllow.PostgresLowDialect;
@@ -27,6 +28,11 @@ import javax.annotation.Nonnull;
 
 @AutoService(JdbcDialectFactory.class)
 public class PostgresDialectFactory implements JdbcDialectFactory {
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.POSTGRESQL;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:postgresql:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialectFactory.java
index 7470f7a0d4..b31a47ea39 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/redshift/RedshiftDialectFactory.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -24,6 +25,11 @@ import com.google.auto.service.AutoService;
 
 @AutoService(JdbcDialectFactory.class)
 public class RedshiftDialectFactory implements JdbcDialectFactory {
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.REDSHIFT;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:redshift:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaDialectFactory.java
index 2d4ea8ad30..d82fe38495 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/saphana/SapHanaDialectFactory.java
@@ -18,6 +18,7 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -26,6 +27,11 @@ import com.google.auto.service.AutoService;
 /** Dialect Factory of {@link SapHanaDialect} */
 @AutoService(JdbcDialectFactory.class)
 public class SapHanaDialectFactory implements JdbcDialectFactory {
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.SAP_HANA;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:sap://");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeDialectFactory.java
index 0b30c6e4fd..2112cff999 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/snowflake/SnowflakeDialectFactory.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.snowflake;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -25,6 +26,11 @@ import com.google.auto.service.AutoService;
 /** Factory for {@link SnowflakeDialect}. */
 @AutoService(JdbcDialectFactory.class)
 public class SnowflakeDialectFactory implements JdbcDialectFactory {
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.SNOWFLAKE;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:snowflake:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialectFactory.java
index 181da6e0a1..3e7e777e26 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlite/SqliteDialectFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -25,6 +26,11 @@ import com.google.auto.service.AutoService;
 /** Factory for {@link SqliteDialect}. */
 @AutoService(JdbcDialectFactory.class)
 public class SqliteDialectFactory implements JdbcDialectFactory {
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.SQLITE;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:sqlite:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
index d7dae4efd5..2f89c96719 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -27,6 +28,11 @@ import javax.annotation.Nonnull;
 /** Factory for {@link SqlServerDialect}. */
 @AutoService(JdbcDialectFactory.class)
 public class SqlServerDialectFactory implements JdbcDialectFactory {
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.SQLSERVER;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:sqlserver:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java
index a7285da5a8..810b49d5fe 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialectFactory.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestore;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -25,6 +26,11 @@ import com.google.auto.service.AutoService;
 /** Factory for {@link TablestoreDialect}. */
 @AutoService(JdbcDialectFactory.class)
 public class TablestoreDialectFactory implements JdbcDialectFactory {
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.TABLE_STORE;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:ots:https:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialectFactory.java
index 70a4492868..50cae2ff95 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/teradata/TeradataDialectFactory.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -25,6 +26,11 @@ import com.google.auto.service.AutoService;
 @AutoService(JdbcDialectFactory.class)
 public class TeradataDialectFactory implements JdbcDialectFactory {
 
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.TERADATA;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:teradata:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectFactory.java
index 3a602de940..8a45127658 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/vertica/VerticaDialectFactory.java
@@ -17,6 +17,7 @@
 
 package 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -25,6 +26,11 @@ import com.google.auto.service.AutoService;
 /** Factory for {@link VerticaDialect}. */
 @AutoService(JdbcDialectFactory.class)
 public class VerticaDialectFactory implements JdbcDialectFactory {
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.VERTICA;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:vertica:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialectFactory.java
index 0e489b728b..f0313317cf 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialectFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialectFactory.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.xugu;
 
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
 
@@ -28,6 +29,11 @@ import javax.annotation.Nonnull;
 @AutoService(JdbcDialectFactory.class)
 public class XuguDialectFactory implements JdbcDialectFactory {
 
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.XUGU;
+    }
+
     @Override
     public boolean acceptsURL(String url) {
         return url.startsWith("jdbc:xugu:");
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index 3af53c8839..5835b38e7b 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -60,6 +60,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.CUSTOM_SQL;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DATA_SAVE_MODE;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DIALECT;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DRIVER;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.GENERATE_SINK_SQL;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.IS_EXACTLY_ONCE;
@@ -234,6 +235,7 @@ public class JdbcSinkFactory implements TableSinkFactory {
                 JdbcDialectLoader.load(
                         sinkConfig.getJdbcConnectionConfig().getUrl(),
                         
sinkConfig.getJdbcConnectionConfig().getCompatibleMode(),
+                        sinkConfig.getJdbcConnectionConfig().getDialect(),
                         fieldIdeEnum == null ? null : fieldIdeEnum.getValue());
         dialect.connectionUrlParse(
                 sinkConfig.getJdbcConnectionConfig().getUrl(),
@@ -269,7 +271,8 @@ public class JdbcSinkFactory implements TableSinkFactory {
                         SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST,
                         PRIMARY_KEYS,
                         COMPATIBLE_MODE,
-                        MULTI_TABLE_SINK_REPLICA)
+                        MULTI_TABLE_SINK_REPLICA,
+                        DIALECT)
                 .conditional(
                         IS_EXACTLY_ONCE,
                         true,
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
index c41f275f6a..3b5cd058a9 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java
@@ -65,7 +65,9 @@ public abstract class ChunkSplitter implements AutoCloseable, 
Serializable {
         this.fetchSize = config.getFetchSize();
         this.jdbcDialect =
                 JdbcDialectLoader.load(
-                        config.getJdbcConnectionConfig().getUrl(), 
config.getCompatibleMode());
+                        config.getJdbcConnectionConfig().getUrl(),
+                        config.getJdbcConnectionConfig().getDialect(),
+                        config.getCompatibleMode());
         this.connectionProvider =
                 
jdbcDialect.getJdbcConnectionProvider(config.getJdbcConnectionConfig());
     }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
index b9ca90ed53..720b9d84e9 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
@@ -35,6 +35,7 @@ import java.io.Serializable;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.COMPATIBLE_MODE;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC;
+import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DIALECT;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DRIVER;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.FETCH_SIZE;
 import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.PARTITION_COLUMN;
@@ -72,6 +73,7 @@ public class JdbcSourceFactory implements TableSourceFactory {
         JdbcDialect jdbcDialect =
                 JdbcDialectLoader.load(
                         config.getJdbcConnectionConfig().getUrl(),
+                        config.getJdbcConnectionConfig().getDialect(),
                         config.getJdbcConnectionConfig().getCompatibleMode());
         jdbcDialect.connectionUrlParse(
                 config.getJdbcConnectionConfig().getUrl(),
@@ -105,7 +107,8 @@ public class JdbcSourceFactory implements 
TableSourceFactory {
                         SPLIT_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND,
                         SPLIT_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND,
                         SPLIT_SAMPLE_SHARDING_THRESHOLD,
-                        SPLIT_INVERSE_SAMPLING_RATE)
+                        SPLIT_INVERSE_SAMPLING_RATE,
+                        DIALECT)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/HiveJdbcUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/HiveJdbcUtils.java
index 0b503b3084..b61cb2e878 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/HiveJdbcUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/HiveJdbcUtils.java
@@ -34,9 +34,9 @@ import static 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConne
 public class HiveJdbcUtils {
 
     public static synchronized void 
doKerberosAuthentication(JdbcConnectionConfig jdbcConfig) {
-        String principal = jdbcConfig.kerberosPrincipal;
-        String keytabPath = jdbcConfig.kerberosKeytabPath;
-        String krb5Path = jdbcConfig.krb5Path;
+        String principal = jdbcConfig.getKerberosPrincipal();
+        String keytabPath = jdbcConfig.getKerberosKeytabPath();
+        String krb5Path = jdbcConfig.getKrb5Path();
         System.setProperty("java.security.krb5.conf", krb5Path);
         Configuration configuration = new Configuration();
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index 0d53e3dec0..782f3c62e7 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -71,7 +71,9 @@ public class JdbcCatalogUtils {
 
         JdbcDialect jdbcDialect =
                 JdbcDialectLoader.load(
-                        jdbcConnectionConfig.getUrl(), 
jdbcConnectionConfig.getCompatibleMode());
+                        jdbcConnectionConfig.getUrl(),
+                        jdbcConnectionConfig.getDialect(),
+                        jdbcConnectionConfig.getCompatibleMode());
         Optional<Catalog> catalog = findCatalog(jdbcConnectionConfig, 
jdbcDialect);
         if (catalog.isPresent()) {
             try (AbstractJdbcCatalog jdbcCatalog = (AbstractJdbcCatalog) 
catalog.get()) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoaderTest.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoaderTest.java
index 84dd36acfb..90e934cd42 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoaderTest.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoaderTest.java
@@ -19,6 +19,7 @@
 package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect;
 
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -27,13 +28,22 @@ import org.junit.jupiter.api.Test;
 public class JdbcDialectLoaderTest {
     @Test
     public void shouldFindGenericDialect() throws Exception {
-        JdbcDialect jdbcDialect = JdbcDialectLoader.load("jdbc:someting:", "");
-        Assertions.assertTrue(jdbcDialect instanceof GenericDialect);
+        JdbcDialect jdbcDialect = JdbcDialectLoader.load("jdbc:someting:", 
null, "");
+        Assertions.assertInstanceOf(GenericDialect.class, jdbcDialect);
     }
 
     @Test
     public void shouldFindMysqlDialect() throws Exception {
-        JdbcDialect jdbcDialect = 
JdbcDialectLoader.load("jdbc:mysql://localhost:3306/test", "");
-        Assertions.assertTrue(jdbcDialect instanceof MysqlDialect);
+        JdbcDialect jdbcDialect =
+                JdbcDialectLoader.load("jdbc:mysql://localhost:3306/test", 
null, "");
+        Assertions.assertInstanceOf(MysqlDialect.class, jdbcDialect);
+    }
+
+    /** Test for {@link JdbcDialectLoader} for appointDialect */
+    @Test
+    public void shouldFindPostgresSQLDialectByDialect() throws Exception {
+        JdbcDialect jdbcDialect =
+                JdbcDialectLoader.load("error:errorurl://xxxxx:3306/test", 
"Postgres", "");
+        Assertions.assertInstanceOf(PostgresDialect.class, jdbcDialect);
     }
 }

Reply via email to