This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ff47b4a842 [Feature][Connector-V2] Support sink connector for AWS DSQL 
#9726 (#9739)
ff47b4a842 is described below

commit ff47b4a842e07f1b4cfd866232994a1bf5de5081
Author: cloud456 <[email protected]>
AuthorDate: Sat Nov 1 14:52:18 2025 +0800

    [Feature][Connector-V2] Support sink connector for AWS DSQL #9726 (#9739)
    
    Co-authored-by: cloud456 <[email protected]>
    Co-authored-by: cloud456 <[email protected]>
    Co-authored-by: David Zollo <[email protected]>
---
 .github/workflows/labeler/label-scope-conf.yml     |   1 -
 docs/en/connector-v2/sink/Jdbc.md                  | 112 +++++++++++----
 docs/zh/connector-v2/sink/Jdbc.md                  |  64 ++++++++-
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |  24 ++++
 .../seatunnel/jdbc/config/JdbcCommonOptions.java   |  14 ++
 .../jdbc/config/JdbcConnectionConfig.java          |  29 ++++
 .../jdbc/internal/dialect/DatabaseIdentifier.java  |   1 +
 .../dsql/DdsqlJdbcConnectionPoolProviderProxy.java |  76 +++++++++++
 .../dialect/dsql/DsqlConnectionPoolManager.java    | 152 +++++++++++++++++++++
 .../jdbc/internal/dialect/dsql/DsqlDialect.java    |  53 +++++++
 .../internal/dialect/dsql/DsqlDialectFactory.java  |  60 ++++++++
 .../dialect/dsql/DsqlJdbcConnectionProvider.java   | 105 ++++++++++++++
 .../dialect/dsql/DsqlJdbcRowConverter.java         |  29 ++++
 .../seatunnel/jdbc/sink/JdbcSinkWriter.java        |  18 ++-
 14 files changed, 706 insertions(+), 32 deletions(-)

diff --git a/.github/workflows/labeler/label-scope-conf.yml 
b/.github/workflows/labeler/label-scope-conf.yml
index 66e2f63e50..67f3c40cb3 100644
--- a/.github/workflows/labeler/label-scope-conf.yml
+++ b/.github/workflows/labeler/label-scope-conf.yml
@@ -327,4 +327,3 @@ sensorsdata:
       - changed-files:
           - any-glob-to-any-file: 
seatunnel-connectors-v2/connector-sensorsdata/**
           - all-globs-to-all-files: 
'!seatunnel-connectors-v2/connector-!(sensorsdata)/**'
-
diff --git a/docs/en/connector-v2/sink/Jdbc.md 
b/docs/en/connector-v2/sink/Jdbc.md
index 0ba6379407..800872c9f2 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -61,6 +61,9 @@ support `Xa transactions`. You can set `is_exactly_once=true` 
to enable it.
 | enable_upsert                             | Boolean | No       | true        
                 |
 | use_copy_statement                        | Boolean | No       | false       
                 |
 | create_index                              | Boolean | No       | true        
                 |
+| access_key_id                             | String  | No       |             
                 |
+| secret_access_key                         | String  | No       |             
                 |
+| region                                    | String  | No       |             
                 |
 
 ### driver [string]
 
@@ -108,7 +111,7 @@ If one dialect not supported by SeaTunnel, it will use the 
default dialect `Gene
 | SqlServer | Tablestore   | Teradata |
 | Vertica   | OceanBase    | XUGU     |
 | IRIS      | Inceptor     | Highgo   |
-
+| DSQL      |              |          |
 ### database [string]
 
 Use this `database` and `table-name` auto-generate sql and receive upstream 
input datas write to database.
@@ -233,6 +236,16 @@ Create the index(contains primary key and any other 
indexes) or not when auto-cr
 
 Notice: Note that this will sacrifice read performance, so you'll need to 
manually create indexes after the table migration to improve read performance
 
+### access_key_id [String]
+The access_key_id in AWS authentication. Only valid for dialect="dsql"
+
+### secret_access_key [String]
+The secret_access_key in AWS authentication. Only valid for dialect="dsql"
+
+### region [String]
+The area where Amazon Aurora DSQL is located. Only valid for dialect="dsql"
+
+
 ## tips
 
 In the case of is_exactly_once = "true", Xa transactions are used. This 
requires database support, and some databases require some setup :
@@ -244,30 +257,31 @@ In the case of is_exactly_once = "true", Xa transactions 
are used. This requires
 
 there are some reference value for params above.
 
-| datasource        |                    driver                    | url       
                                                         | 
xa_data_source_class_name                          | maven                      
                                                                                
                   |
-|-------------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|
-| MySQL             | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | 
com.mysql.cj.jdbc.MysqlXADataSource                | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                                              |
-| PostgreSQL        | org.postgresql.Driver                        | 
jdbc:postgresql://localhost:5432/postgres                          | 
org.postgresql.xa.PGXADataSource                   | 
https://mvnrepository.com/artifact/org.postgresql/postgresql                    
                                              |
-| DM                | dm.jdbc.driver.DmDriver                      | 
jdbc:dm://localhost:5236                                           | 
dm.jdbc.driver.DmdbXADataSource                    | 
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                    
                                              |
-| Phoenix           | org.apache.phoenix.queryserver.client.Driver | 
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /          
                                        | 
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
                                          |
-| SQL Server        | com.microsoft.sqlserver.jdbc.SQLServerDriver | 
jdbc:sqlserver://localhost:1433                                    | 
com.microsoft.sqlserver.jdbc.SQLServerXADataSource | 
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc           
                                              |
-| Oracle            | oracle.jdbc.OracleDriver                     | 
jdbc:oracle:thin:@localhost:1521/xepdb1                            | 
oracle.jdbc.xa.OracleXADataSource                  | 
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8              
                                              |
-| sqlite            | org.sqlite.JDBC                              | 
jdbc:sqlite:test.db                                                | /          
                                        | 
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc                       
                                              |
-| GBase8a           | com.gbase.jdbc.Driver                        | 
jdbc:gbase://e2e_gbase8aDb:5258/test                               | /          
                                        | 
https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar
                            |
-| StarRocks         | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | /          
                                        | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                                              |
-| db2               | com.ibm.db2.jcc.DB2Driver                    | 
jdbc:db2://localhost:50000/testdb                                  | 
com.ibm.db2.jcc.DB2XADataSource                    | 
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4               
                                              |
-| saphana           | com.sap.db.jdbc.Driver                       | 
jdbc:sap://localhost:39015                                         | /          
                                        | 
https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc                  
                                              |
-| Doris             | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                   | /          
                                        | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                                              |
-| teradata          | com.teradata.jdbc.TeraDriver                 | 
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test              | /          
                                        | 
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc                   
                                              |
-| Redshift          | com.amazon.redshift.jdbc42.Driver            | 
jdbc:redshift://localhost:5439/testdb                              | 
com.amazon.redshift.xa.RedshiftXADataSource        | 
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42          
                                              |
-| Snowflake         | net.snowflake.client.jdbc.SnowflakeDriver    | 
jdbc&#58;snowflake://<account_name>.snowflakecomputing.com         | /          
                                        | 
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc                 
                                              |
-| Vertica           | com.vertica.jdbc.Driver                      | 
jdbc:vertica://localhost:5433                                      | /          
                                        | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
                               |
-| Kingbase          | com.kingbase8.Driver                         | 
jdbc:kingbase8://localhost:54321/db_test                           | /          
                                        | 
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
                                            |
-| OceanBase         | com.oceanbase.jdbc.Driver                    | 
jdbc:oceanbase://localhost:2881                                    | /          
                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
                              |
-| xugu              | com.xugu.cloudjdbc.Driver                    | 
jdbc:xugu://localhost:5138                                         | /          
                                        | 
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar 
                                              |
-| InterSystems IRIS | com.intersystems.jdbc.IRISDriver             | 
jdbc:IRIS://localhost:1972/%SYS                                    | /          
                                        | 
https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar
 |
-| opengauss         | org.opengauss.Driver                         | 
jdbc:opengauss://localhost:5432/postgres                           | /          
                                        | 
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
                              |
-| Highgo            | com.highgo.jdbc.Driver                       | 
jdbc:highgo://localhost:5866/highgo                                | /          
                                        | 
https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar     
                                              |
+| datasource        |                    driver                    | url       
                                                          | 
xa_data_source_class_name                          | maven                      
                                                                                
                   |
+|-------------------|----------------------------------------------|---------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|
+| MySQL             | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                    | 
com.mysql.cj.jdbc.MysqlXADataSource                | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                                              |
+| PostgreSQL        | org.postgresql.Driver                        | 
jdbc:postgresql://localhost:5432/postgres                           | 
org.postgresql.xa.PGXADataSource                   | 
https://mvnrepository.com/artifact/org.postgresql/postgresql                    
                                              |
+| DM                | dm.jdbc.driver.DmDriver                      | 
jdbc:dm://localhost:5236                                            | 
dm.jdbc.driver.DmdbXADataSource                    | 
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18                    
                                              |
+| Phoenix           | org.apache.phoenix.queryserver.client.Driver | 
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF  | /         
                                         | 
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
                                          |
+| SQL Server        | com.microsoft.sqlserver.jdbc.SQLServerDriver | 
jdbc:sqlserver://localhost:1433                                     | 
com.microsoft.sqlserver.jdbc.SQLServerXADataSource | 
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc           
                                              |
+| Oracle            | oracle.jdbc.OracleDriver                     | 
jdbc:oracle:thin:@localhost:1521/xepdb1                             | 
oracle.jdbc.xa.OracleXADataSource                  | 
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8              
                                              |
+| sqlite            | org.sqlite.JDBC                              | 
jdbc:sqlite:test.db                                                 | /         
                                         | 
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc                       
                                              |
+| GBase8a           | com.gbase.jdbc.Driver                        | 
jdbc:gbase://e2e_gbase8aDb:5258/test                                | /         
                                         | 
https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar
                            |
+| StarRocks         | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                    | /         
                                         | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                                              |
+| db2               | com.ibm.db2.jcc.DB2Driver                    | 
jdbc:db2://localhost:50000/testdb                                   | 
com.ibm.db2.jcc.DB2XADataSource                    | 
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4               
                                              |
+| saphana           | com.sap.db.jdbc.Driver                       | 
jdbc:sap://localhost:39015                                          | /         
                                         | 
https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc                  
                                              |
+| Doris             | com.mysql.cj.jdbc.Driver                     | 
jdbc:mysql://localhost:3306/test                                    | /         
                                         | 
https://mvnrepository.com/artifact/mysql/mysql-connector-java                   
                                              |
+| teradata          | com.teradata.jdbc.TeraDriver                 | 
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test               | /         
                                         | 
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc                   
                                              |
+| Redshift          | com.amazon.redshift.jdbc42.Driver            | 
jdbc:redshift://localhost:5439/testdb                               | 
com.amazon.redshift.xa.RedshiftXADataSource        | 
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42          
                                              |
+| Snowflake         | net.snowflake.client.jdbc.SnowflakeDriver    | 
jdbc&#58;snowflake://<account_name>.snowflakecomputing.com          | /         
                                         | 
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc                 
                                              |
+| Vertica           | com.vertica.jdbc.Driver                      | 
jdbc:vertica://localhost:5433                                       | /         
                                         | 
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
                               |
+| Kingbase          | com.kingbase8.Driver                         | 
jdbc:kingbase8://localhost:54321/db_test                            | /         
                                         | 
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
                                            |
+| OceanBase         | com.oceanbase.jdbc.Driver                    | 
jdbc:oceanbase://localhost:2881                                     | /         
                                         | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
                              |
+| xugu              | com.xugu.cloudjdbc.Driver                    | 
jdbc:xugu://localhost:5138                                          | /         
                                         | 
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar 
                                              |
+| InterSystems IRIS | com.intersystems.jdbc.IRISDriver             | 
jdbc:IRIS://localhost:1972/%SYS                                     | /         
                                         | 
https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar
 |
+| opengauss         | org.opengauss.Driver                         | 
jdbc:opengauss://localhost:5432/postgres                            | /         
                                         | 
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
                              |
+| Highgo            | com.highgo.jdbc.Driver                       | 
jdbc:highgo://localhost:5866/highgo                                 | /         
                                         | 
https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar     
                                              |
+| Dsql              | org.postgresql.Driver                        | 
jdbc:postgresql://Amazon Aurora DSQL Cluster Endpoint:5432/postgres | 
org.postgresql.xa.PGXADataSource                   | 
https://mvnrepository.com/artifact/org.postgresql/postgresql                    
                                              |
 
 ## Example
 
@@ -449,6 +463,54 @@ sink {
 }
 ```
 
+#### Dsql example
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Jdbc {
+    driver = oracle.jdbc.driver.OracleDriver
+    url = "jdbc:oracle:thin:@localhost:1521/XE"
+    user = testUser
+    password = testPassword
+
+    table_list = [
+      {
+        table_path = "TESTSCHEMA.TABLE_1"
+      },
+      {
+        table_path = "TESTSCHEMA.TABLE_2"
+      }
+    ]
+  }
+}
+
+transform {
+}
+
+sink {
+    Jdbc {
+        dialect="Dsql"
+        driver = "org.postgresql.Driver"
+        
url="jdbc:postgresql://ixxxxxxxxxxxxx.dsql.us-east-1.on.aws:5432/postgres"
+        username = "admin"
+        access_key_id = "ACCESSKEYIDEXAMPLE"
+        secret_access_key = "SECRETACCESSKEYEXAMPLE"
+        region = "us-east-1"
+        database = "postgres"
+        generate_sink_sql = true
+        primary_keys = ["id"]
+        max_retries = 3
+        batch_size = 1000
+
+    }
+}
+```
+
 ## Changelog
 
 <ChangeLog />
diff --git a/docs/zh/connector-v2/sink/Jdbc.md 
b/docs/zh/connector-v2/sink/Jdbc.md
index 08b45f1012..f5fe62bc41 100644
--- a/docs/zh/connector-v2/sink/Jdbc.md
+++ b/docs/zh/connector-v2/sink/Jdbc.md
@@ -58,6 +58,9 @@ import ChangeLog from '../changelog/connector-jdbc.md';
 | custom_sql                                | String  | 否    | -               
             |
 | enable_upsert                             | Boolean | 否    | true            
             |
 | use_copy_statement                        | Boolean | 否    | false           
             |
+| access_key_id                             | String  | 否       |              
                |
+| secret_access_key                         | String  | 否       |              
                |
+| region                                    | String  | 否       |              
                |
 
 ### driver [string]
 
@@ -105,7 +108,7 @@ Postgres 9.5及以下版本,请设置为 `postgresLow` 来支持 CDC
 | SqlServer | Tablestore | Teradata |
 | Vertica   | OceanBase  | XUGU     |
 | IRIS      | Inceptor   | Highgo   |
-
+| DSQL      |            |          |
 
 ### database [string]
 
@@ -222,6 +225,15 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
 
 注意:不支持 `MAP`、`ARRAY`、`ROW`类型
 
+### access_key_id [String]
+AWS IAM 认证中所需要的access_key_id 。 该参考仅适用于 dialect="dsql"
+
+### secret_access_key [String]
+AWS IAM 认证中所需要的secret_access_key。 该参考仅适用于 dialect="dsql"
+
+### region [String]
+Amazon Aurora DSQL 所在的区域。 该参考仅适用于 dialect="dsql"
+
 ## tips
 
 在 is_exactly_once = "true" 的情况下,使用 XA 事务。这需要数据库支持,有些数据库需要一些设置:<br/>
@@ -256,6 +268,7 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
 | OceanBase  | com.oceanbase.jdbc.Driver                    | 
jdbc:oceanbase://localhost:2881                                    | /          
                                        | 
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
   |
 | opengauss  | org.opengauss.Driver                         | 
jdbc:opengauss://localhost:5432/postgres                           | /          
                                        | 
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
   |
 | Highgo     | com.highgo.jdbc.Driver                       | 
jdbc:highgo://localhost:5866/highgo                                | /          
                                        | 
https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar     
                   |
+| Dsql       | org.postgresql.Driver                        | 
jdbc:postgresql://Amazon Aurora DSQL Cluster Endpoint:5432/postgres | 
org.postgresql.xa.PGXADataSource                   | 
https://mvnrepository.com/artifact/org.postgresql/postgresql                    
                                              |
 
 ## 示例
 
@@ -354,6 +367,55 @@ sink {
 
 ```
 
+
+#### Dsql 示例
+
+```hocon
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Jdbc {
+    driver = oracle.jdbc.driver.OracleDriver
+    url = "jdbc:oracle:thin:@localhost:1521/XE"
+    user = testUser
+    password = testPassword
+
+    table_list = [
+      {
+        table_path = "TESTSCHEMA.TABLE_1"
+      },
+      {
+        table_path = "TESTSCHEMA.TABLE_2"
+      }
+    ]
+  }
+}
+
+transform {
+}
+
+sink {
+    Jdbc {
+        dialect="Dsql"
+        driver = "org.postgresql.Driver"
+        
url="jdbc:postgresql://ixxxxxxxxxxxxx.dsql.us-east-1.on.aws:5432/postgres"
+        username = "admin"
+        access_key_id = "ACCESSKEYIDEXAMPLE"
+        secret_access_key = "SECRETACCESSKEYEXAMPLE"
+        region = "us-east-1"
+        database = "postgres"
+        generate_sink_sql = true
+        primary_keys = ["id"]
+        max_retries = 3
+        batch_size = 1000
+
+    }
+}
+```
+
 ## 变更日志
 
 <ChangeLog />
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml 
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index e8aa6b3a46..533ae6344c 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -58,6 +58,7 @@
         <highgo.version>6.2.3</highgo.version>
         <presto.version>0.279</presto.version>
         <trino.version>460</trino.version>
+        <aws.sdk.version>2.31.30</aws.sdk.version>
     </properties>
 
     <dependencyManagement>
@@ -375,6 +376,29 @@
             <groupId>io.trino</groupId>
             <artifactId>trino-jdbc</artifactId>
         </dependency>
+        <!-- AWS SDK for DSQL -->
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>dsql</artifactId>
+            <version>${aws.sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>auth</artifactId>
+            <version>${aws.sdk.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>regions</artifactId>
+            <version>${aws.sdk.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>sts</artifactId>
+            <version>${aws.sdk.version}</version>
+        </dependency>
     </dependencies>
 
 </project>
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcCommonOptions.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcCommonOptions.java
index d7f7bbe835..55abfc4694 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcCommonOptions.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcCommonOptions.java
@@ -129,6 +129,20 @@ public class JdbcCommonOptions {
                     .mapType()
                     .noDefaultValue()
                     .withDescription("additional connection configuration 
parameters");
+    public static final Option<String> ACCESS_KEY_ID =
+            Options.key("access_key_id")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("access_key_id");
+
+    public static final Option<String> SECRET_ACCESS_KEY =
+            Options.key("secret_access_key")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("secret_access_key");
+
+    public static final Option<String> REGION =
+            
Options.key("region").stringType().noDefaultValue().withDescription("region");
 
     public static final OptionRule.Builder BASE_CATALOG_RULE =
             OptionRule.builder()
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
index 3f6cc70c6b..b3b49e465f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
@@ -64,6 +64,9 @@ public class JdbcConnectionConfig implements Serializable {
     private String dialect = JdbcCommonOptions.DIALECT.defaultValue();
 
     private Map<String, String> properties;
+    private String region;
+    private String accessKeyId;
+    private String secretAccessKey;
 
     private boolean handleBlobAsString = 
JdbcCommonOptions.HANDLE_BLOB_AS_STRING.defaultValue();
 
@@ -98,6 +101,10 @@ public class JdbcConnectionConfig implements Serializable {
         config.getOptional(JdbcCommonOptions.INT_TYPE_NARROWING)
                 .ifPresent(builder::intTypeNarrowing);
         
config.getOptional(JdbcCommonOptions.DIALECT).ifPresent(builder::dialect);
+        
config.getOptional(JdbcCommonOptions.ACCESS_KEY_ID).ifPresent(builder::accessKeyId);
+        
config.getOptional(JdbcCommonOptions.SECRET_ACCESS_KEY).ifPresent(builder::secretAccessKey);
+        
config.getOptional(JdbcCommonOptions.REGION).ifPresent(builder::region);
+
         return builder.build();
     }
 
@@ -142,6 +149,9 @@ public class JdbcConnectionConfig implements Serializable {
         public String kerberosKeytabPath;
         public String krb5Path = JdbcCommonOptions.KRB5_PATH.defaultValue();
         public String dialect = JdbcCommonOptions.DIALECT.defaultValue();
+        private String region;
+        private String accessKeyId;
+        private String secretAccessKey;
 
         private Builder() {}
 
@@ -255,6 +265,21 @@ public class JdbcConnectionConfig implements Serializable {
             return this;
         }
 
+        public Builder region(String region) {
+            this.region = region;
+            return this;
+        }
+
+        public Builder accessKeyId(String accessKeyId) {
+            this.accessKeyId = accessKeyId;
+            return this;
+        }
+
+        public Builder secretAccessKey(String secretAccessKey) {
+            this.secretAccessKey = secretAccessKey;
+            return this;
+        }
+
         public JdbcConnectionConfig build() {
             JdbcConnectionConfig jdbcConnectionConfig = new 
JdbcConnectionConfig();
             jdbcConnectionConfig.batchSize = this.batchSize;
@@ -279,6 +304,10 @@ public class JdbcConnectionConfig implements Serializable {
             jdbcConnectionConfig.dialect = this.dialect;
             jdbcConnectionConfig.properties =
                     this.properties == null ? new HashMap<>() : 
this.properties;
+
+            jdbcConnectionConfig.region = this.region;
+            jdbcConnectionConfig.accessKeyId = this.accessKeyId;
+            jdbcConnectionConfig.secretAccessKey = this.secretAccessKey;
             return jdbcConnectionConfig;
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
index 55d2ee7865..bddee48b23 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
@@ -47,4 +47,5 @@ public class DatabaseIdentifier {
     public static final String HIGHGO = "Highgo";
     public static final String GREENPLUM = "Greenplum";
     public static final String PRESTO = "Presto";
+    public static final String DSQL = "Dsql";
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DdsqlJdbcConnectionPoolProviderProxy.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DdsqlJdbcConnectionPoolProviderProxy.java
new file mode 100644
index 0000000000..7a8047a78a
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DdsqlJdbcConnectionPoolProviderProxy.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dsql;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+@Slf4j
+public class DdsqlJdbcConnectionPoolProviderProxy implements 
JdbcConnectionProvider {
+
+    private final transient DsqlConnectionPoolManager poolManager;
+    private final JdbcConnectionConfig jdbcConfig;
+    private final int queueIndex;
+
+    public DdsqlJdbcConnectionPoolProviderProxy(JdbcConnectionConfig 
jdbcConfig, int queueIndex) {
+
+        this.jdbcConfig = jdbcConfig;
+        this.poolManager = new DsqlConnectionPoolManager(jdbcConfig);
+        this.queueIndex = queueIndex;
+    }
+
+    @Override
+    public Connection getConnection() {
+        return poolManager.getConnection(queueIndex);
+    }
+
+    @Override
+    public boolean isConnectionValid() throws SQLException {
+        return poolManager.containsConnection(queueIndex)
+                && poolManager
+                        .getConnection(queueIndex)
+                        
.isValid(jdbcConfig.getConnectionCheckTimeoutSeconds());
+    }
+
+    @Override
+    public Connection getOrEstablishConnection() {
+        return poolManager.getConnection(queueIndex);
+    }
+
+    @Override
+    public void closeConnection() {
+        if (poolManager.containsConnection(queueIndex)) {
+            try {
+                poolManager.remove(queueIndex).close();
+            } catch (SQLException e) {
+                log.warn("JDBC connection close failed.", e);
+            }
+        }
+    }
+
+    @Override
+    public Connection reestablishConnection() {
+        closeConnection();
+        return getOrEstablishConnection();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlConnectionPoolManager.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlConnectionPoolManager.java
new file mode 100644
index 0000000000..2fbd46fedb
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlConnectionPoolManager.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dsql;
+
+import org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dsql.DsqlUtilities;
+import software.amazon.awssdk.services.dsql.model.GenerateAuthTokenRequest;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Getter
+public class DsqlConnectionPoolManager {
+
+    private HikariDataSource connectionPool;
+    private Map<Integer, Connection> connectionMap;
+    private AwsCredentialsProvider provider;
+    private DsqlUtilities dsqlUtilities;
+    private JdbcConnectionConfig jdbcConfig;
+    private ScheduledExecutorService tokenRefreshExecutor;
+
+    DsqlConnectionPoolManager(JdbcConnectionConfig jdbcConfig) {
+        initAWSInfo(jdbcConfig);
+        this.connectionPool = new HikariDataSource();
+        this.connectionPool.setIdleTimeout(30 * 1000);
+        this.connectionPool.setMaximumPoolSize(10);
+        this.connectionPool.setJdbcUrl(jdbcConfig.getUrl());
+        this.connectionPool.setPassword(generateAuthToken(getDBHost()));
+        this.connectionPool.setDriverClassName(jdbcConfig.getDriverName());
+        this.connectionPool.setUsername(jdbcConfig.getUsername().get());
+        this.connectionPool.setAutoCommit(jdbcConfig.isAutoCommit());
+        this.connectionMap = new ConcurrentHashMap<>();
+        this.tokenRefreshExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        r -> {
+                            Thread t = new Thread(r, "dsql-token-refresh");
+                            t.setDaemon(true);
+                            return t;
+                        });
+        // Schedule token refresh every 10 minutes (tokens are valid for 15 
minutes)
+        tokenRefreshExecutor.scheduleAtFixedRate(this::resetPassword, 10, 10, 
TimeUnit.MINUTES);
+    }
+
+    public void initAWSInfo(JdbcConnectionConfig jdbcConfig) {
+        this.jdbcConfig = jdbcConfig;
+        this.provider =
+                new AwsCredentialsProvider() {
+                    @Override
+                    public AwsCredentials resolveCredentials() {
+                        return AwsBasicCredentials.create(
+                                jdbcConfig.getAccessKeyId(), 
jdbcConfig.getSecretAccessKey());
+                    }
+                };
+        this.dsqlUtilities =
+                this.dsqlUtilities =
+                        DsqlUtilities.builder()
+                                .region(Region.of(jdbcConfig.getRegion()))
+                                .credentialsProvider(provider)
+                                .build();
+    }
+
+    private void resetPassword() {
+        
connectionPool.getHikariConfigMXBean().setPassword(generateAuthToken(getDBHost()));
+        log.warn("Reset password for dsql connection successfully!");
+    }
+
+    private String getDBHost() {
+        String url = jdbcConfig.getUrl();
+        JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(url);
+        return urlInfo.getHost();
+    }
+
+    private String generateAuthToken(String clusterEndpoint) {
+
+        GenerateAuthTokenRequest tokenGenerator =
+                GenerateAuthTokenRequest.builder()
+                        .hostname(clusterEndpoint)
+                        .region(Region.of(jdbcConfig.getRegion()))
+                        .credentialsProvider(this.provider)
+                        .build();
+
+        if ("admin".equals(jdbcConfig.getUsername().get())) {
+            return 
dsqlUtilities.generateDbConnectAdminAuthToken(tokenGenerator);
+        } else {
+            return dsqlUtilities.generateDbConnectAuthToken(tokenGenerator);
+        }
+    }
+
+    public Connection getConnection(int index) {
+        return connectionMap.computeIfAbsent(
+                index,
+                i -> {
+                    try {
+                        return connectionPool.getConnection();
+                    } catch (SQLException e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+    }
+
+    public boolean containsConnection(int index) {
+        return connectionMap.containsKey(index);
+    }
+
+    public Connection remove(int index) {
+        return connectionMap.remove(index);
+    }
+
+    public String getPoolName() {
+        return connectionPool.getPoolName();
+    }
+
+    public void close() {
+        if (!connectionPool.isClosed()) {
+            connectionPool.close();
+        }
+        if (!tokenRefreshExecutor.isShutdown()) {
+            tokenRefreshExecutor.shutdownNow();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlDialect.java
new file mode 100644
index 0000000000..928a372b61
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlDialect.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dsql;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect;
+
+public class DsqlDialect extends PostgresDialect {
+
+    public DsqlDialect(String fieldIde) {
+        this.fieldIde = fieldIde;
+    }
+
+    @Override
+    public String dialectName() {
+        return DatabaseIdentifier.DSQL;
+    }
+
+    @Override
+    public JdbcRowConverter getRowConverter() {
+        return new DsqlJdbcRowConverter();
+    }
+
+    @Override
+    public JdbcConnectionProvider getJdbcConnectionProvider(
+            JdbcConnectionConfig jdbcConnectionConfig) {
+        return new DsqlJdbcConnectionProvider(jdbcConnectionConfig);
+    }
+
+    @Override
+    public String tableIdentifier(String database, String tableName) {
+
+        return quoteIdentifier(tableName);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlDialectFactory.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlDialectFactory.java
new file mode 100644
index 0000000000..5b60a0c2a8
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlDialectFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dsql;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+import javax.annotation.Nonnull;
+
+import java.util.regex.Pattern;
+
+@AutoService(JdbcDialectFactory.class)
+public class DsqlDialectFactory implements JdbcDialectFactory {
+
+    private static final Pattern DSQL_PATTERN = 
Pattern.compile(".*dsql\\.[a-z0-9-]+\\.on\\.aws.*");
+
+    @Override
+    public String dialectFactoryName() {
+        return DatabaseIdentifier.DSQL;
+    }
+
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:postgresql:") && containsDsql(url);
+    }
+
+    @Override
+    public JdbcDialect create() {
+        throw new UnsupportedOperationException(
+                "Can't create JdbcDialect without compatible mode for Dsql");
+    }
+
+    @Override
+    public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) 
{
+
+        return new DsqlDialect(fieldIde);
+    }
+
+    private boolean containsDsql(String url) {
+        return DSQL_PATTERN.matcher(url).matches();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlJdbcConnectionProvider.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlJdbcConnectionProvider.java
new file mode 100644
index 0000000000..d4b6be8e17
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlJdbcConnectionProvider.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dsql;
+
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+
+import lombok.NonNull;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dsql.DsqlUtilities;
+import software.amazon.awssdk.services.dsql.model.GenerateAuthTokenRequest;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.SQLException;
+import java.util.Properties;
+
+public class DsqlJdbcConnectionProvider extends SimpleJdbcConnectionProvider {
+
+    private AwsCredentialsProvider provider;
+    private DsqlUtilities dsqlUtilities;
+
+    public DsqlJdbcConnectionProvider(@NonNull JdbcConnectionConfig 
jdbcConfig) {
+        super(jdbcConfig);
+        this.provider =
+                new AwsCredentialsProvider() {
+                    @Override
+                    public AwsCredentials resolveCredentials() {
+                        return AwsBasicCredentials.create(
+                                jdbcConfig.getAccessKeyId(), 
jdbcConfig.getSecretAccessKey());
+                    }
+                };
+        this.dsqlUtilities =
+                DsqlUtilities.builder()
+                        .region(Region.of(jdbcConfig.getRegion()))
+                        .credentialsProvider(provider)
+                        .build();
+    }
+
+    @Override
+    public Connection getOrEstablishConnection() throws SQLException, 
ClassNotFoundException {
+        if (isConnectionValid()) {
+            return connection;
+        }
+        Driver driver = getLoadedDriver();
+        Properties info = new Properties();
+        if (jdbcConfig.getUsername().isPresent()) {
+            info.setProperty("user", jdbcConfig.getUsername().get());
+        }
+        String url = jdbcConfig.getUrl();
+        JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(url);
+        info.setProperty("password", generateAuthToken(urlInfo.getHost()));
+
+        info.putAll(jdbcConfig.getProperties());
+
+        connection = driver.connect(url, info);
+        if (connection == null) {
+            // Throw same exception as DriverManager.getConnection when no 
driver found to match
+            // caller expectation.
+            throw new JdbcConnectorException(
+                    JdbcConnectorErrorCode.NO_SUITABLE_DRIVER,
+                    "No suitable driver found for " + url);
+        }
+
+        connection.setAutoCommit(jdbcConfig.isAutoCommit());
+
+        return connection;
+    }
+
+    private String generateAuthToken(String clusterEndpoint) {
+        JdbcConnectionConfig jdbcConfig = super.getJdbcConfig();
+        GenerateAuthTokenRequest tokenGenerator =
+                GenerateAuthTokenRequest.builder()
+                        .hostname(clusterEndpoint)
+                        .region(Region.of(jdbcConfig.getRegion()))
+                        .credentialsProvider(this.provider)
+                        .build();
+
+        if ("admin".equals(jdbcConfig.getUsername().get())) {
+            return 
dsqlUtilities.generateDbConnectAdminAuthToken(tokenGenerator);
+        } else {
+            return dsqlUtilities.generateDbConnectAuthToken(tokenGenerator);
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlJdbcRowConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlJdbcRowConverter.java
new file mode 100644
index 0000000000..3f2a3ad4b5
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlJdbcRowConverter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dsql;
+
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresJdbcRowConverter;
+
+public class DsqlJdbcRowConverter extends PostgresJdbcRowConverter {
+
+    @Override
+    public String converterName() {
+        return DatabaseIdentifier.DSQL;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index d367acb006..49d8b47ef6 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -32,7 +32,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErr
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionPoolProviderProxy;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dsql.DdsqlJdbcConnectionPoolProviderProxy;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
 
@@ -106,11 +108,17 @@ public class JdbcSinkWriter extends 
AbstractJdbcSinkWriter<ConnectionPoolManager
             MultiTableResourceManager<ConnectionPoolManager> 
multiTableResourceManager,
             int queueIndex) {
         connectionProvider.closeConnection();
-        this.connectionProvider =
-                new SimpleJdbcConnectionPoolProviderProxy(
-                        multiTableResourceManager.getSharedResource().get(),
-                        jdbcSinkConfig.getJdbcConnectionConfig(),
-                        queueIndex);
+        if (this.dialect.dialectName().equals(DatabaseIdentifier.DSQL)) {
+            this.connectionProvider =
+                    new DdsqlJdbcConnectionPoolProviderProxy(
+                            jdbcSinkConfig.getJdbcConnectionConfig(), 
queueIndex);
+        } else {
+            this.connectionProvider =
+                    new SimpleJdbcConnectionPoolProviderProxy(
+                            
multiTableResourceManager.getSharedResource().get(),
+                            jdbcSinkConfig.getJdbcConnectionConfig(),
+                            queueIndex);
+        }
         this.outputFormat =
                 new JdbcOutputFormatBuilder(
                                 dialect,

Reply via email to