This is an automated email from the ASF dual-hosted git repository.
shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ff47b4a842 [Feature][Connector-V2] Support sink connector for AWS DSQL
#9726 (#9739)
ff47b4a842 is described below
commit ff47b4a842e07f1b4cfd866232994a1bf5de5081
Author: cloud456 <[email protected]>
AuthorDate: Sat Nov 1 14:52:18 2025 +0800
[Feature][Connector-V2] Support sink connector for AWS DSQL #9726 (#9739)
Co-authored-by: cloud456 <[email protected]>
Co-authored-by: cloud456 <[email protected]>
Co-authored-by: David Zollo <[email protected]>
---
.github/workflows/labeler/label-scope-conf.yml | 1 -
docs/en/connector-v2/sink/Jdbc.md | 112 +++++++++++----
docs/zh/connector-v2/sink/Jdbc.md | 64 ++++++++-
seatunnel-connectors-v2/connector-jdbc/pom.xml | 24 ++++
.../seatunnel/jdbc/config/JdbcCommonOptions.java | 14 ++
.../jdbc/config/JdbcConnectionConfig.java | 29 ++++
.../jdbc/internal/dialect/DatabaseIdentifier.java | 1 +
.../dsql/DdsqlJdbcConnectionPoolProviderProxy.java | 76 +++++++++++
.../dialect/dsql/DsqlConnectionPoolManager.java | 152 +++++++++++++++++++++
.../jdbc/internal/dialect/dsql/DsqlDialect.java | 53 +++++++
.../internal/dialect/dsql/DsqlDialectFactory.java | 60 ++++++++
.../dialect/dsql/DsqlJdbcConnectionProvider.java | 105 ++++++++++++++
.../dialect/dsql/DsqlJdbcRowConverter.java | 29 ++++
.../seatunnel/jdbc/sink/JdbcSinkWriter.java | 18 ++-
14 files changed, 706 insertions(+), 32 deletions(-)
diff --git a/.github/workflows/labeler/label-scope-conf.yml
b/.github/workflows/labeler/label-scope-conf.yml
index 66e2f63e50..67f3c40cb3 100644
--- a/.github/workflows/labeler/label-scope-conf.yml
+++ b/.github/workflows/labeler/label-scope-conf.yml
@@ -327,4 +327,3 @@ sensorsdata:
- changed-files:
- any-glob-to-any-file:
seatunnel-connectors-v2/connector-sensorsdata/**
- all-globs-to-all-files:
'!seatunnel-connectors-v2/connector-!(sensorsdata)/**'
-
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index 0ba6379407..800872c9f2 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -61,6 +61,9 @@ support `Xa transactions`. You can set `is_exactly_once=true`
to enable it.
| enable_upsert | Boolean | No | true
|
| use_copy_statement | Boolean | No | false
|
| create_index | Boolean | No | true
|
+| access_key_id | String | No |
|
+| secret_access_key | String | No |
|
+| region | String | No |
|
### driver [string]
@@ -108,7 +111,7 @@ If one dialect not supported by SeaTunnel, it will use the
default dialect `Gene
| SqlServer | Tablestore | Teradata |
| Vertica | OceanBase | XUGU |
| IRIS | Inceptor | Highgo |
-
+| DSQL | | |
### database [string]
Use this `database` and `table-name` auto-generate sql and receive upstream
input datas write to database.
@@ -233,6 +236,16 @@ Create the index(contains primary key and any other
indexes) or not when auto-cr
Notice: Note that this will sacrifice read performance, so you'll need to
manually create indexes after the table migration to improve read performance
+### access_key_id [String]
+The access_key_id in AWS authentication. Only valid for dialect="dsql"
+
+### secret_access_key [String]
+The secret_access_key in AWS authentication. Only valid for dialect="dsql"
+
+### region [String]
+The area where Amazon Aurora DSQL is located. Only valid for dialect="dsql"
+
+
## tips
In the case of is_exactly_once = "true", Xa transactions are used. This
requires database support, and some databases require some setup :
@@ -244,30 +257,31 @@ In the case of is_exactly_once = "true", Xa transactions
are used. This requires
there are some reference value for params above.
-| datasource | driver | url
|
xa_data_source_class_name | maven
|
-|-------------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|
-| MySQL | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
com.mysql.cj.jdbc.MysqlXADataSource |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
-| PostgreSQL | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/postgres |
org.postgresql.xa.PGXADataSource |
https://mvnrepository.com/artifact/org.postgresql/postgresql
|
-| DM | dm.jdbc.driver.DmDriver |
jdbc:dm://localhost:5236 |
dm.jdbc.driver.DmdbXADataSource |
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18
|
-| Phoenix | org.apache.phoenix.queryserver.client.Driver |
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /
|
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
|
-| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver |
jdbc:sqlserver://localhost:1433 |
com.microsoft.sqlserver.jdbc.SQLServerXADataSource |
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
|
-| Oracle | oracle.jdbc.OracleDriver |
jdbc:oracle:thin:@localhost:1521/xepdb1 |
oracle.jdbc.xa.OracleXADataSource |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
|
-| sqlite | org.sqlite.JDBC |
jdbc:sqlite:test.db | /
|
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc
|
-| GBase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test | /
|
https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar
|
-| StarRocks | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
-| db2 | com.ibm.db2.jcc.DB2Driver |
jdbc:db2://localhost:50000/testdb |
com.ibm.db2.jcc.DB2XADataSource |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
-| saphana | com.sap.db.jdbc.Driver |
jdbc:sap://localhost:39015 | /
|
https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc
|
-| Doris | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
-| teradata | com.teradata.jdbc.TeraDriver |
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | /
|
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc
|
-| Redshift | com.amazon.redshift.jdbc42.Driver |
jdbc:redshift://localhost:5439/testdb |
com.amazon.redshift.xa.RedshiftXADataSource |
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42
|
-| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver |
jdbc:snowflake://<account_name>.snowflakecomputing.com | /
|
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc
|
-| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 | /
|
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
-| Kingbase | com.kingbase8.Driver |
jdbc:kingbase8://localhost:54321/db_test | /
|
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
|
-| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
|
-| xugu | com.xugu.cloudjdbc.Driver |
jdbc:xugu://localhost:5138 | /
|
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar
|
-| InterSystems IRIS | com.intersystems.jdbc.IRISDriver |
jdbc:IRIS://localhost:1972/%SYS | /
|
https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar
|
-| opengauss | org.opengauss.Driver |
jdbc:opengauss://localhost:5432/postgres | /
|
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
|
-| Highgo | com.highgo.jdbc.Driver |
jdbc:highgo://localhost:5866/highgo | /
|
https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar
|
+| datasource | driver | url
|
xa_data_source_class_name | maven
|
+|-------------------|----------------------------------------------|---------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|
+| MySQL | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
com.mysql.cj.jdbc.MysqlXADataSource |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
+| PostgreSQL | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/postgres |
org.postgresql.xa.PGXADataSource |
https://mvnrepository.com/artifact/org.postgresql/postgresql
|
+| DM | dm.jdbc.driver.DmDriver |
jdbc:dm://localhost:5236 |
dm.jdbc.driver.DmdbXADataSource |
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18
|
+| Phoenix | org.apache.phoenix.queryserver.client.Driver |
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /
|
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
|
+| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver |
jdbc:sqlserver://localhost:1433 |
com.microsoft.sqlserver.jdbc.SQLServerXADataSource |
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
|
+| Oracle | oracle.jdbc.OracleDriver |
jdbc:oracle:thin:@localhost:1521/xepdb1 |
oracle.jdbc.xa.OracleXADataSource |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
|
+| sqlite | org.sqlite.JDBC |
jdbc:sqlite:test.db | /
|
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc
|
+| GBase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test | /
|
https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar
|
+| StarRocks | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
+| db2 | com.ibm.db2.jcc.DB2Driver |
jdbc:db2://localhost:50000/testdb |
com.ibm.db2.jcc.DB2XADataSource |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
+| saphana | com.sap.db.jdbc.Driver |
jdbc:sap://localhost:39015 | /
|
https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc
|
+| Doris | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
+| teradata | com.teradata.jdbc.TeraDriver |
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | /
|
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc
|
+| Redshift | com.amazon.redshift.jdbc42.Driver |
jdbc:redshift://localhost:5439/testdb |
com.amazon.redshift.xa.RedshiftXADataSource |
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42
|
+| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver |
jdbc:snowflake://<account_name>.snowflakecomputing.com | /
|
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc
|
+| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 | /
|
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
+| Kingbase | com.kingbase8.Driver |
jdbc:kingbase8://localhost:54321/db_test | /
|
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
|
+| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
|
+| xugu | com.xugu.cloudjdbc.Driver |
jdbc:xugu://localhost:5138 | /
|
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar
|
+| InterSystems IRIS | com.intersystems.jdbc.IRISDriver |
jdbc:IRIS://localhost:1972/%SYS | /
|
https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar
|
+| opengauss | org.opengauss.Driver |
jdbc:opengauss://localhost:5432/postgres | /
|
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
|
+| Highgo | com.highgo.jdbc.Driver |
jdbc:highgo://localhost:5866/highgo | /
|
https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar
|
+| Dsql | org.postgresql.Driver |
jdbc:postgresql://Amazon Aurora DSQL Cluster Endpoint:5432/postgres |
org.postgresql.xa.PGXADataSource |
https://mvnrepository.com/artifact/org.postgresql/postgresql
|
## Example
@@ -449,6 +463,54 @@ sink {
}
```
+#### Dsql example
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ driver = oracle.jdbc.driver.OracleDriver
+ url = "jdbc:oracle:thin:@localhost:1521/XE"
+ user = testUser
+ password = testPassword
+
+ table_list = [
+ {
+ table_path = "TESTSCHEMA.TABLE_1"
+ },
+ {
+ table_path = "TESTSCHEMA.TABLE_2"
+ }
+ ]
+ }
+}
+
+transform {
+}
+
+sink {
+ Jdbc {
+ dialect="Dsql"
+ driver = "org.postgresql.Driver"
+
url="jdbc:postgresql://ixxxxxxxxxxxxx.dsql.us-east-1.on.aws:5432/postgres"
+ username = "admin"
+ access_key_id = "ACCESSKEYIDEXAMPLE"
+ secret_access_key = "SECRETACCESSKEYEXAMPLE"
+ region = "us-east-1"
+ database = "postgres"
+ generate_sink_sql = true
+ primary_keys = ["id"]
+ max_retries = 3
+ batch_size = 1000
+
+ }
+}
+```
+
## Changelog
<ChangeLog />
diff --git a/docs/zh/connector-v2/sink/Jdbc.md
b/docs/zh/connector-v2/sink/Jdbc.md
index 08b45f1012..f5fe62bc41 100644
--- a/docs/zh/connector-v2/sink/Jdbc.md
+++ b/docs/zh/connector-v2/sink/Jdbc.md
@@ -58,6 +58,9 @@ import ChangeLog from '../changelog/connector-jdbc.md';
| custom_sql | String | 否 | -
|
| enable_upsert | Boolean | 否 | true
|
| use_copy_statement | Boolean | 否 | false
|
+| access_key_id | String | 否 |
|
+| secret_access_key | String | 否 |
|
+| region | String | 否 |
|
### driver [string]
@@ -105,7 +108,7 @@ Postgres 9.5及以下版本,请设置为 `postgresLow` 来支持 CDC
| SqlServer | Tablestore | Teradata |
| Vertica | OceanBase | XUGU |
| IRIS | Inceptor | Highgo |
-
+| DSQL | | |
### database [string]
@@ -222,6 +225,15 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
注意:不支持 `MAP`、`ARRAY`、`ROW`类型
+### access_key_id [String]
+AWS IAM 认证中所需要的access_key_id 。 该参考仅适用于 dialect="dsql"
+
+### secret_access_key [String]
+AWS IAM 认证中所需要的secret_access_key。 该参考仅适用于 dialect="dsql"
+
+### region [String]
+Amazon Aurora DSQL 所在的区域。 该参考仅适用于 dialect="dsql"
+
## tips
在 is_exactly_once = "true" 的情况下,使用 XA 事务。这需要数据库支持,有些数据库需要一些设置:<br/>
@@ -256,6 +268,7 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.12/oceanbase-client-2.4.12.jar
|
| opengauss | org.opengauss.Driver |
jdbc:opengauss://localhost:5432/postgres | /
|
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
|
| Highgo | com.highgo.jdbc.Driver |
jdbc:highgo://localhost:5866/highgo | /
|
https://repo1.maven.org/maven2/com/highgo/HgdbJdbc/6.2.3/HgdbJdbc-6.2.3.jar
|
+| Dsql | org.postgresql.Driver |
jdbc:postgresql://Amazon Aurora DSQL Cluster Endpoint:5432/postgres |
org.postgresql.xa.PGXADataSource |
https://mvnrepository.com/artifact/org.postgresql/postgresql
|
## 示例
@@ -354,6 +367,55 @@ sink {
```
+
+#### Dsql 示例
+
+```hocon
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ driver = oracle.jdbc.driver.OracleDriver
+ url = "jdbc:oracle:thin:@localhost:1521/XE"
+ user = testUser
+ password = testPassword
+
+ table_list = [
+ {
+ table_path = "TESTSCHEMA.TABLE_1"
+ },
+ {
+ table_path = "TESTSCHEMA.TABLE_2"
+ }
+ ]
+ }
+}
+
+transform {
+}
+
+sink {
+ Jdbc {
+ dialect="Dsql"
+ driver = "org.postgresql.Driver"
+
url="jdbc:postgresql://ixxxxxxxxxxxxx.dsql.us-east-1.on.aws:5432/postgres"
+ username = "admin"
+ access_key_id = "ACCESSKEYIDEXAMPLE"
+ secret_access_key = "SECRETACCESSKEYEXAMPLE"
+ region = "us-east-1"
+ database = "postgres"
+ generate_sink_sql = true
+ primary_keys = ["id"]
+ max_retries = 3
+ batch_size = 1000
+
+ }
+}
+```
+
## 变更日志
<ChangeLog />
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index e8aa6b3a46..533ae6344c 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -58,6 +58,7 @@
<highgo.version>6.2.3</highgo.version>
<presto.version>0.279</presto.version>
<trino.version>460</trino.version>
+ <aws.sdk.version>2.31.30</aws.sdk.version>
</properties>
<dependencyManagement>
@@ -375,6 +376,29 @@
<groupId>io.trino</groupId>
<artifactId>trino-jdbc</artifactId>
</dependency>
+ <!-- AWS SDK for DSQL -->
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>dsql</artifactId>
+ <version>${aws.sdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>auth</artifactId>
+ <version>${aws.sdk.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>regions</artifactId>
+ <version>${aws.sdk.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>sts</artifactId>
+ <version>${aws.sdk.version}</version>
+ </dependency>
</dependencies>
</project>
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcCommonOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcCommonOptions.java
index d7f7bbe835..55abfc4694 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcCommonOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcCommonOptions.java
@@ -129,6 +129,20 @@ public class JdbcCommonOptions {
.mapType()
.noDefaultValue()
.withDescription("additional connection configuration
parameters");
+ public static final Option<String> ACCESS_KEY_ID =
+ Options.key("access_key_id")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("access_key_id");
+
+ public static final Option<String> SECRET_ACCESS_KEY =
+ Options.key("secret_access_key")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("secret_access_key");
+
+ public static final Option<String> REGION =
+
Options.key("region").stringType().noDefaultValue().withDescription("region");
public static final OptionRule.Builder BASE_CATALOG_RULE =
OptionRule.builder()
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
index 3f6cc70c6b..b3b49e465f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
@@ -64,6 +64,9 @@ public class JdbcConnectionConfig implements Serializable {
private String dialect = JdbcCommonOptions.DIALECT.defaultValue();
private Map<String, String> properties;
+ private String region;
+ private String accessKeyId;
+ private String secretAccessKey;
private boolean handleBlobAsString =
JdbcCommonOptions.HANDLE_BLOB_AS_STRING.defaultValue();
@@ -98,6 +101,10 @@ public class JdbcConnectionConfig implements Serializable {
config.getOptional(JdbcCommonOptions.INT_TYPE_NARROWING)
.ifPresent(builder::intTypeNarrowing);
config.getOptional(JdbcCommonOptions.DIALECT).ifPresent(builder::dialect);
+
config.getOptional(JdbcCommonOptions.ACCESS_KEY_ID).ifPresent(builder::accessKeyId);
+
config.getOptional(JdbcCommonOptions.SECRET_ACCESS_KEY).ifPresent(builder::secretAccessKey);
+
config.getOptional(JdbcCommonOptions.REGION).ifPresent(builder::region);
+
return builder.build();
}
@@ -142,6 +149,9 @@ public class JdbcConnectionConfig implements Serializable {
public String kerberosKeytabPath;
public String krb5Path = JdbcCommonOptions.KRB5_PATH.defaultValue();
public String dialect = JdbcCommonOptions.DIALECT.defaultValue();
+ private String region;
+ private String accessKeyId;
+ private String secretAccessKey;
private Builder() {}
@@ -255,6 +265,21 @@ public class JdbcConnectionConfig implements Serializable {
return this;
}
+ public Builder region(String region) {
+ this.region = region;
+ return this;
+ }
+
+ public Builder accessKeyId(String accessKeyId) {
+ this.accessKeyId = accessKeyId;
+ return this;
+ }
+
+ public Builder secretAccessKey(String secretAccessKey) {
+ this.secretAccessKey = secretAccessKey;
+ return this;
+ }
+
public JdbcConnectionConfig build() {
JdbcConnectionConfig jdbcConnectionConfig = new
JdbcConnectionConfig();
jdbcConnectionConfig.batchSize = this.batchSize;
@@ -279,6 +304,10 @@ public class JdbcConnectionConfig implements Serializable {
jdbcConnectionConfig.dialect = this.dialect;
jdbcConnectionConfig.properties =
this.properties == null ? new HashMap<>() :
this.properties;
+
+ jdbcConnectionConfig.region = this.region;
+ jdbcConnectionConfig.accessKeyId = this.accessKeyId;
+ jdbcConnectionConfig.secretAccessKey = this.secretAccessKey;
return jdbcConnectionConfig;
}
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
index 55d2ee7865..bddee48b23 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
@@ -47,4 +47,5 @@ public class DatabaseIdentifier {
public static final String HIGHGO = "Highgo";
public static final String GREENPLUM = "Greenplum";
public static final String PRESTO = "Presto";
+ public static final String DSQL = "Dsql";
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DdsqlJdbcConnectionPoolProviderProxy.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DdsqlJdbcConnectionPoolProviderProxy.java
new file mode 100644
index 0000000000..7a8047a78a
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DdsqlJdbcConnectionPoolProviderProxy.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dsql;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+@Slf4j
+public class DdsqlJdbcConnectionPoolProviderProxy implements
JdbcConnectionProvider {
+
+ private final transient DsqlConnectionPoolManager poolManager;
+ private final JdbcConnectionConfig jdbcConfig;
+ private final int queueIndex;
+
+ public DdsqlJdbcConnectionPoolProviderProxy(JdbcConnectionConfig
jdbcConfig, int queueIndex) {
+
+ this.jdbcConfig = jdbcConfig;
+ this.poolManager = new DsqlConnectionPoolManager(jdbcConfig);
+ this.queueIndex = queueIndex;
+ }
+
+ @Override
+ public Connection getConnection() {
+ return poolManager.getConnection(queueIndex);
+ }
+
+ @Override
+ public boolean isConnectionValid() throws SQLException {
+ return poolManager.containsConnection(queueIndex)
+ && poolManager
+ .getConnection(queueIndex)
+
.isValid(jdbcConfig.getConnectionCheckTimeoutSeconds());
+ }
+
+ @Override
+ public Connection getOrEstablishConnection() {
+ return poolManager.getConnection(queueIndex);
+ }
+
+ @Override
+ public void closeConnection() {
+ if (poolManager.containsConnection(queueIndex)) {
+ try {
+ poolManager.remove(queueIndex).close();
+ } catch (SQLException e) {
+ log.warn("JDBC connection close failed.", e);
+ }
+ }
+ }
+
+ @Override
+ public Connection reestablishConnection() {
+ closeConnection();
+ return getOrEstablishConnection();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlConnectionPoolManager.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlConnectionPoolManager.java
new file mode 100644
index 0000000000..2fbd46fedb
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlConnectionPoolManager.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dsql;
+
+import org.apache.seatunnel.shade.com.zaxxer.hikari.HikariDataSource;
+
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dsql.DsqlUtilities;
+import software.amazon.awssdk.services.dsql.model.GenerateAuthTokenRequest;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Getter
+public class DsqlConnectionPoolManager {
+
+ private HikariDataSource connectionPool;
+ private Map<Integer, Connection> connectionMap;
+ private AwsCredentialsProvider provider;
+ private DsqlUtilities dsqlUtilities;
+ private JdbcConnectionConfig jdbcConfig;
+ private ScheduledExecutorService tokenRefreshExecutor;
+
+ DsqlConnectionPoolManager(JdbcConnectionConfig jdbcConfig) {
+ initAWSInfo(jdbcConfig);
+ this.connectionPool = new HikariDataSource();
+ this.connectionPool.setIdleTimeout(30 * 1000);
+ this.connectionPool.setMaximumPoolSize(10);
+ this.connectionPool.setJdbcUrl(jdbcConfig.getUrl());
+ this.connectionPool.setPassword(generateAuthToken(getDBHost()));
+ this.connectionPool.setDriverClassName(jdbcConfig.getDriverName());
+ this.connectionPool.setUsername(jdbcConfig.getUsername().get());
+ this.connectionPool.setAutoCommit(jdbcConfig.isAutoCommit());
+ this.connectionMap = new ConcurrentHashMap<>();
+ this.tokenRefreshExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ r -> {
+ Thread t = new Thread(r, "dsql-token-refresh");
+ t.setDaemon(true);
+ return t;
+ });
+ // Schedule token refresh every 10 minutes (tokens are valid for 15
minutes)
+ tokenRefreshExecutor.scheduleAtFixedRate(this::resetPassword, 10, 10,
TimeUnit.MINUTES);
+ }
+
+ public void initAWSInfo(JdbcConnectionConfig jdbcConfig) {
+ this.jdbcConfig = jdbcConfig;
+ this.provider =
+ new AwsCredentialsProvider() {
+ @Override
+ public AwsCredentials resolveCredentials() {
+ return AwsBasicCredentials.create(
+ jdbcConfig.getAccessKeyId(),
jdbcConfig.getSecretAccessKey());
+ }
+ };
+ this.dsqlUtilities =
+ this.dsqlUtilities =
+ DsqlUtilities.builder()
+ .region(Region.of(jdbcConfig.getRegion()))
+ .credentialsProvider(provider)
+ .build();
+ }
+
+ private void resetPassword() {
+
connectionPool.getHikariConfigMXBean().setPassword(generateAuthToken(getDBHost()));
+ log.warn("Reset password for dsql connection successfully!");
+ }
+
+ private String getDBHost() {
+ String url = jdbcConfig.getUrl();
+ JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(url);
+ return urlInfo.getHost();
+ }
+
+ private String generateAuthToken(String clusterEndpoint) {
+
+ GenerateAuthTokenRequest tokenGenerator =
+ GenerateAuthTokenRequest.builder()
+ .hostname(clusterEndpoint)
+ .region(Region.of(jdbcConfig.getRegion()))
+ .credentialsProvider(this.provider)
+ .build();
+
+ if ("admin".equals(jdbcConfig.getUsername().get())) {
+ return
dsqlUtilities.generateDbConnectAdminAuthToken(tokenGenerator);
+ } else {
+ return dsqlUtilities.generateDbConnectAuthToken(tokenGenerator);
+ }
+ }
+
+ public Connection getConnection(int index) {
+ return connectionMap.computeIfAbsent(
+ index,
+ i -> {
+ try {
+ return connectionPool.getConnection();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ public boolean containsConnection(int index) {
+ return connectionMap.containsKey(index);
+ }
+
+ public Connection remove(int index) {
+ return connectionMap.remove(index);
+ }
+
+ public String getPoolName() {
+ return connectionPool.getPoolName();
+ }
+
+ public void close() {
+ if (!connectionPool.isClosed()) {
+ connectionPool.close();
+ }
+ if (!tokenRefreshExecutor.isShutdown()) {
+ tokenRefreshExecutor.shutdownNow();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlDialect.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlDialect.java
new file mode 100644
index 0000000000..928a372b61
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlDialect.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dsql;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect;
+
+public class DsqlDialect extends PostgresDialect {
+
+ public DsqlDialect(String fieldIde) {
+ this.fieldIde = fieldIde;
+ }
+
+ @Override
+ public String dialectName() {
+ return DatabaseIdentifier.DSQL;
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter() {
+ return new DsqlJdbcRowConverter();
+ }
+
+ @Override
+ public JdbcConnectionProvider getJdbcConnectionProvider(
+ JdbcConnectionConfig jdbcConnectionConfig) {
+ return new DsqlJdbcConnectionProvider(jdbcConnectionConfig);
+ }
+
+ @Override
+ public String tableIdentifier(String database, String tableName) {
+
+ return quoteIdentifier(tableName);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlDialectFactory.java
new file mode 100644
index 0000000000..5b60a0c2a8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlDialectFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dsql;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+import javax.annotation.Nonnull;
+
+import java.util.regex.Pattern;
+
+@AutoService(JdbcDialectFactory.class)
+public class DsqlDialectFactory implements JdbcDialectFactory {
+
+ private static final Pattern DSQL_PATTERN =
Pattern.compile(".*dsql\\.[a-z0-9-]+\\.on\\.aws.*");
+
+ @Override
+ public String dialectFactoryName() {
+ return DatabaseIdentifier.DSQL;
+ }
+
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:postgresql:") && containsDsql(url);
+ }
+
+ @Override
+ public JdbcDialect create() {
+ throw new UnsupportedOperationException(
+ "Can't create JdbcDialect without compatible mode for Dsql");
+ }
+
+ @Override
+ public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde)
{
+
+ return new DsqlDialect(fieldIde);
+ }
+
+ private boolean containsDsql(String url) {
+ return DSQL_PATTERN.matcher(url).matches();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlJdbcConnectionProvider.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlJdbcConnectionProvider.java
new file mode 100644
index 0000000000..d4b6be8e17
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlJdbcConnectionProvider.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dsql;
+
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
+
+import lombok.NonNull;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dsql.DsqlUtilities;
+import software.amazon.awssdk.services.dsql.model.GenerateAuthTokenRequest;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.SQLException;
+import java.util.Properties;
+
+public class DsqlJdbcConnectionProvider extends SimpleJdbcConnectionProvider {
+
+ private AwsCredentialsProvider provider;
+ private DsqlUtilities dsqlUtilities;
+
+ public DsqlJdbcConnectionProvider(@NonNull JdbcConnectionConfig
jdbcConfig) {
+ super(jdbcConfig);
+ this.provider =
+ new AwsCredentialsProvider() {
+ @Override
+ public AwsCredentials resolveCredentials() {
+ return AwsBasicCredentials.create(
+ jdbcConfig.getAccessKeyId(),
jdbcConfig.getSecretAccessKey());
+ }
+ };
+ this.dsqlUtilities =
+ DsqlUtilities.builder()
+ .region(Region.of(jdbcConfig.getRegion()))
+ .credentialsProvider(provider)
+ .build();
+ }
+
+ @Override
+ public Connection getOrEstablishConnection() throws SQLException,
ClassNotFoundException {
+ if (isConnectionValid()) {
+ return connection;
+ }
+ Driver driver = getLoadedDriver();
+ Properties info = new Properties();
+ if (jdbcConfig.getUsername().isPresent()) {
+ info.setProperty("user", jdbcConfig.getUsername().get());
+ }
+ String url = jdbcConfig.getUrl();
+ JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(url);
+ info.setProperty("password", generateAuthToken(urlInfo.getHost()));
+
+ info.putAll(jdbcConfig.getProperties());
+
+ connection = driver.connect(url, info);
+ if (connection == null) {
+ // Throw same exception as DriverManager.getConnection when no
driver found to match
+ // caller expectation.
+ throw new JdbcConnectorException(
+ JdbcConnectorErrorCode.NO_SUITABLE_DRIVER,
+ "No suitable driver found for " + url);
+ }
+
+ connection.setAutoCommit(jdbcConfig.isAutoCommit());
+
+ return connection;
+ }
+
+ private String generateAuthToken(String clusterEndpoint) {
+ JdbcConnectionConfig jdbcConfig = super.getJdbcConfig();
+ GenerateAuthTokenRequest tokenGenerator =
+ GenerateAuthTokenRequest.builder()
+ .hostname(clusterEndpoint)
+ .region(Region.of(jdbcConfig.getRegion()))
+ .credentialsProvider(this.provider)
+ .build();
+
+ if ("admin".equals(jdbcConfig.getUsername().get())) {
+ return
dsqlUtilities.generateDbConnectAdminAuthToken(tokenGenerator);
+ } else {
+ return dsqlUtilities.generateDbConnectAuthToken(tokenGenerator);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlJdbcRowConverter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlJdbcRowConverter.java
new file mode 100644
index 0000000000..3f2a3ad4b5
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/dsql/DsqlJdbcRowConverter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dsql;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresJdbcRowConverter;
+
+public class DsqlJdbcRowConverter extends PostgresJdbcRowConverter {
+
+ @Override
+ public String converterName() {
+ return DatabaseIdentifier.DSQL;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index d367acb006..49d8b47ef6 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -32,7 +32,9 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErr
import
org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionPoolProviderProxy;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dsql.DdsqlJdbcConnectionPoolProviderProxy;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
@@ -106,11 +108,17 @@ public class JdbcSinkWriter extends
AbstractJdbcSinkWriter<ConnectionPoolManager
MultiTableResourceManager<ConnectionPoolManager>
multiTableResourceManager,
int queueIndex) {
connectionProvider.closeConnection();
- this.connectionProvider =
- new SimpleJdbcConnectionPoolProviderProxy(
- multiTableResourceManager.getSharedResource().get(),
- jdbcSinkConfig.getJdbcConnectionConfig(),
- queueIndex);
+ if (this.dialect.dialectName().equals(DatabaseIdentifier.DSQL)) {
+ this.connectionProvider =
+ new DdsqlJdbcConnectionPoolProviderProxy(
+ jdbcSinkConfig.getJdbcConnectionConfig(),
queueIndex);
+ } else {
+ this.connectionProvider =
+ new SimpleJdbcConnectionPoolProviderProxy(
+
multiTableResourceManager.getSharedResource().get(),
+ jdbcSinkConfig.getJdbcConnectionConfig(),
+ queueIndex);
+ }
this.outputFormat =
new JdbcOutputFormatBuilder(
dialect,