This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new bbf643772e [Feature][connector-v2]Support opengauss jdbc connnector
using opengauss driver. (#7622)
bbf643772e is described below
commit bbf643772e399b287e152613c482dfa6430f58aa
Author: luckyLJY <[email protected]>
AuthorDate: Wed Oct 16 18:49:23 2024 +0800
[Feature][connector-v2]Support opengauss jdbc connnector using opengauss
driver. (#7622)
Co-authored-by: lucky_ljy <[email protected]>
---
docs/en/connector-v2/sink/Jdbc.md | 5 +-
docs/en/connector-v2/source/Jdbc.md | 5 +-
docs/zh/connector-v2/sink/Jdbc.md | 41 +--
seatunnel-connectors-v2/connector-jdbc/pom.xml | 12 +-
.../jdbc/catalog/opengauss/OpenGaussCatalog.java | 44 +++
.../catalog/opengauss/OpenGaussCatalogFactory.java | 62 ++++
.../jdbc/internal/dialect/DatabaseIdentifier.java | 1 +
.../dialect/opengauss/OpenGaussDialectFactory.java | 31 ++
.../connectors/seatunnel/jdbc/JdbcOpenGaussIT.java | 332 +++++++++++++++++++++
.../resources/jdbc_opengauss_source_and_sink.conf | 67 +++++
10 files changed, 574 insertions(+), 26 deletions(-)
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index 1ddbdd507d..8df5f12bfa 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -226,7 +226,7 @@ In the case of is_exactly_once = "true", Xa transactions
are used. This requires
there are some reference value for params above.
-| datasource | driver |
url |
xa_data_source_class_name |
maven
|
+| datasource | driver |
url |
xa_data_source_class_name | maven
|
|-------------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|
| MySQL | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
com.mysql.cj.jdbc.MysqlXADataSource |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
| PostgreSQL | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/postgres |
org.postgresql.xa.PGXADataSource |
https://mvnrepository.com/artifact/org.postgresql/postgresql
|
@@ -235,7 +235,7 @@ there are some reference value for params above.
| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver |
jdbc:sqlserver://localhost:1433 |
com.microsoft.sqlserver.jdbc.SQLServerXADataSource |
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
|
| Oracle | oracle.jdbc.OracleDriver |
jdbc:oracle:thin:@localhost:1521/xepdb1 |
oracle.jdbc.xa.OracleXADataSource |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
|
| sqlite | org.sqlite.JDBC |
jdbc:sqlite:test.db | /
|
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc
|
-| GBase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test | /
|
https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar
|
+| GBase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test | /
|
https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar
|
| StarRocks | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
| db2 | com.ibm.db2.jcc.DB2Driver |
jdbc:db2://localhost:50000/testdb |
com.ibm.db2.jcc.DB2XADataSource |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
| saphana | com.sap.db.jdbc.Driver |
jdbc:sap://localhost:39015 | /
|
https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc
|
@@ -248,6 +248,7 @@ there are some reference value for params above.
| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar
|
| xugu | com.xugu.cloudjdbc.Driver |
jdbc:xugu://localhost:5138 | /
|
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar
|
| InterSystems IRIS | com.intersystems.jdbc.IRISDriver |
jdbc:IRIS://localhost:1972/%SYS | /
|
https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar
|
+| opengauss | org.opengauss.Driver |
jdbc:opengauss://localhost:5432/postgres | /
|
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
|
## Example
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index 27b3d87558..f2e7486e24 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -113,7 +113,7 @@ The JDBC Source connector supports parallel reading of data
from tables. SeaTunn
there are some reference value for params above.
-| datasource | driver |
url |
maven
|
+| datasource | driver |
url | maven
|
|-------------------|-----------------------------------------------------|------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------|
| mysql | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
| postgresql | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/postgres |
https://mvnrepository.com/artifact/org.postgresql/postgresql
|
@@ -122,7 +122,7 @@ there are some reference value for params above.
| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver |
jdbc:sqlserver://localhost:1433 |
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
|
| oracle | oracle.jdbc.OracleDriver |
jdbc:oracle:thin:@localhost:1521/xepdb1 |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
|
| sqlite | org.sqlite.JDBC |
jdbc:sqlite:test.db |
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc
|
-| gbase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test |
https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar
|
+| gbase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test |
https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar
|
| starrocks | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
| db2 | com.ibm.db2.jcc.DB2Driver |
jdbc:db2://localhost:50000/testdb |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver |
"jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" |
https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc
|
@@ -137,6 +137,7 @@ there are some reference value for params above.
| Hive | org.apache.hive.jdbc.HiveDriver |
jdbc:hive2://localhost:10000 |
https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3-standalone.jar
|
| xugu | com.xugu.cloudjdbc.Driver |
jdbc:xugu://localhost:5138 |
https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar
|
| InterSystems IRIS | com.intersystems.jdbc.IRISDriver |
jdbc:IRIS://localhost:1972/%SYS |
https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar
|
+| opengauss | org.opengauss.Driver |
jdbc:opengauss://localhost:5432/postgres |
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
|
## Example
diff --git a/docs/zh/connector-v2/sink/Jdbc.md
b/docs/zh/connector-v2/sink/Jdbc.md
index e1ab422952..6a7253da61 100644
--- a/docs/zh/connector-v2/sink/Jdbc.md
+++ b/docs/zh/connector-v2/sink/Jdbc.md
@@ -216,26 +216,27 @@ Sink插件常用参数,请参考 [Sink常用选项](../sink-common-options.md)
附录参数仅提供参考
-| 数据源 | driver |
url |
xa_data_source_class_name |
maven |
-|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
-| MySQL | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
com.mysql.cj.jdbc.MysqlXADataSource |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
-| PostgreSQL | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/postgres |
org.postgresql.xa.PGXADataSource |
https://mvnrepository.com/artifact/org.postgresql/postgresql
|
-| DM | dm.jdbc.driver.DmDriver |
jdbc:dm://localhost:5236 |
dm.jdbc.driver.DmdbXADataSource |
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18
|
-| Phoenix | org.apache.phoenix.queryserver.client.Driver |
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /
|
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
|
-| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver |
jdbc:sqlserver://localhost:1433 |
com.microsoft.sqlserver.jdbc.SQLServerXADataSource |
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
|
-| Oracle | oracle.jdbc.OracleDriver |
jdbc:oracle:thin:@localhost:1521/xepdb1 |
oracle.jdbc.xa.OracleXADataSource |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
|
-| sqlite | org.sqlite.JDBC |
jdbc:sqlite:test.db | /
|
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc
|
-| GBase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test | /
|
https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar
|
-| StarRocks | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
-| db2 | com.ibm.db2.jcc.DB2Driver |
jdbc:db2://localhost:50000/testdb |
com.ibm.db2.jcc.DB2XADataSource |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
-| saphana | com.sap.db.jdbc.Driver |
jdbc:sap://localhost:39015 | /
|
https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc
|
-| Doris | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
-| teradata | com.teradata.jdbc.TeraDriver |
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | /
|
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc
|
-| Redshift | com.amazon.redshift.jdbc42.Driver |
jdbc:redshift://localhost:5439/testdb |
com.amazon.redshift.xa.RedshiftXADataSource |
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42
|
-| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver |
jdbc:snowflake://<account_name>.snowflakecomputing.com | /
|
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc
|
-| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 | /
|
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
-| Kingbase | com.kingbase8.Driver |
jdbc:kingbase8://localhost:54321/db_test | /
|
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
|
-| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar
|
+| 数据源 | driver |
url |
xa_data_source_class_name | maven
|
+|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|------------------------------------------------------------------------------------------------------|
+| MySQL | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test |
com.mysql.cj.jdbc.MysqlXADataSource |
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
+| PostgreSQL | org.postgresql.Driver |
jdbc:postgresql://localhost:5432/postgres |
org.postgresql.xa.PGXADataSource |
https://mvnrepository.com/artifact/org.postgresql/postgresql
|
+| DM | dm.jdbc.driver.DmDriver |
jdbc:dm://localhost:5236 |
dm.jdbc.driver.DmdbXADataSource |
https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18
|
+| Phoenix | org.apache.phoenix.queryserver.client.Driver |
jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | /
|
https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client
|
+| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver |
jdbc:sqlserver://localhost:1433 |
com.microsoft.sqlserver.jdbc.SQLServerXADataSource |
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
|
+| Oracle | oracle.jdbc.OracleDriver |
jdbc:oracle:thin:@localhost:1521/xepdb1 |
oracle.jdbc.xa.OracleXADataSource |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
|
+| sqlite | org.sqlite.JDBC |
jdbc:sqlite:test.db | /
|
https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc
|
+| GBase8a | com.gbase.jdbc.Driver |
jdbc:gbase://e2e_gbase8aDb:5258/test | /
|
https://cdn.gbase.cn/products/30/p5CiVwXBKQYIUGN8ecHvk/gbase-connector-java-9.5.0.7-build1-bin.jar
|
+| StarRocks | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
+| db2 | com.ibm.db2.jcc.DB2Driver |
jdbc:db2://localhost:50000/testdb |
com.ibm.db2.jcc.DB2XADataSource |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4
|
+| saphana | com.sap.db.jdbc.Driver |
jdbc:sap://localhost:39015 | /
|
https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc
|
+| Doris | com.mysql.cj.jdbc.Driver |
jdbc:mysql://localhost:3306/test | /
|
https://mvnrepository.com/artifact/mysql/mysql-connector-java
|
+| teradata | com.teradata.jdbc.TeraDriver |
jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | /
|
https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc
|
+| Redshift | com.amazon.redshift.jdbc42.Driver |
jdbc:redshift://localhost:5439/testdb |
com.amazon.redshift.xa.RedshiftXADataSource |
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42
|
+| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver |
jdbc:snowflake://<account_name>.snowflakecomputing.com | /
|
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc
|
+| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 | /
|
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
+| Kingbase | com.kingbase8.Driver |
jdbc:kingbase8://localhost:54321/db_test | /
|
https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar
|
+| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.11/oceanbase-client-2.4.11.jar
|
+| opengauss | org.opengauss.Driver |
jdbc:opengauss://localhost:5432/postgres | /
|
https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar
|
## 示例
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml
b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 7b4199c462..a6be0dd03b 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -53,6 +53,7 @@
<xugu.jdbc.version>12.2.0</xugu.jdbc.version>
<iris.jdbc.version>3.0.0</iris.jdbc.version>
<tikv.version>3.2.0</tikv.version>
+ <opengauss.jdbc.version>5.1.0-og</opengauss.jdbc.version>
</properties>
<dependencyManagement>
@@ -203,11 +204,15 @@
<version>${iris.jdbc.version}</version>
<scope>provided</scope>
</dependency>
-
<dependency>
<groupId>org.tikv</groupId>
<artifactId>tikv-client-java</artifactId>
<version>${tikv.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.opengauss</groupId>
+ <artifactId>opengauss-jdbc</artifactId>
+ <version>${opengauss.jdbc.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
@@ -316,11 +321,14 @@
<groupId>com.intersystems</groupId>
<artifactId>intersystems-jdbc</artifactId>
</dependency>
-
<dependency>
<groupId>org.tikv</groupId>
<artifactId>tikv-client-java</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.opengauss</groupId>
+ <artifactId>opengauss-jdbc</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalog.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalog.java
new file mode 100644
index 0000000000..6a8eab5010
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalog.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.opengauss;
+
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.psql.PostgresCatalog;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.extern.slf4j.Slf4j;
+
+import java.sql.Connection;
+
+@Slf4j
+public class OpenGaussCatalog extends PostgresCatalog {
+
+ public OpenGaussCatalog(
+ String catalogName,
+ String username,
+ String pwd,
+ JdbcUrlUtil.UrlInfo urlInfo,
+ String defaultSchema) {
+ super(catalogName, username, pwd, urlInfo, defaultSchema);
+ }
+
+ @VisibleForTesting
+ public void setConnection(String url, Connection connection) {
+ this.connectionMap.put(url, connection);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalogFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalogFactory.java
new file mode 100644
index 0000000000..bff96ff6d3
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/opengauss/OpenGaussCatalogFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.opengauss;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.OptionValidationException;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.factory.CatalogFactory;
+import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+
+import com.google.auto.service.AutoService;
+
+import java.util.Optional;
+
+@AutoService(Factory.class)
+public class OpenGaussCatalogFactory implements CatalogFactory {
+
+ @Override
+ public String factoryIdentifier() {
+ return DatabaseIdentifier.OPENGAUSS;
+ }
+
+ @Override
+ public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
+ String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL);
+ JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase);
+ Optional<String> defaultDatabase = urlInfo.getDefaultDatabase();
+ if (!defaultDatabase.isPresent()) {
+ throw new OptionValidationException(JdbcCatalogOptions.BASE_URL);
+ }
+ return new OpenGaussCatalog(
+ catalogName,
+ options.get(JdbcCatalogOptions.USERNAME),
+ options.get(JdbcCatalogOptions.PASSWORD),
+ urlInfo,
+ options.get(JdbcCatalogOptions.SCHEMA));
+ }
+
+ @Override
+ public OptionRule optionRule() {
+ return JdbcCatalogOptions.BASE_RULE.build();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
index 45f849c28b..e2a32b4f3f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java
@@ -42,4 +42,5 @@ public class DatabaseIdentifier {
public static final String XUGU = "XUGU";
public static final String IRIS = "IRIS";
public static final String INCEPTOR = "Inceptor";
+ public static final String OPENGAUSS = "OpenGauss";
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/opengauss/OpenGaussDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/opengauss/OpenGaussDialectFactory.java
new file mode 100644
index 0000000000..b1ceed51e9
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/opengauss/OpenGaussDialectFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.opengauss;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialectFactory;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(JdbcDialectFactory.class)
+public class OpenGaussDialectFactory extends PostgresDialectFactory {
+
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:opengauss:");
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOpenGaussIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOpenGaussIT.java
new file mode 100644
index 0000000000..5d2b8b19dc
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOpenGaussIT.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.ConstraintKey;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.utils.JdbcUrlUtil;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.opengauss.OpenGaussCatalog;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+@Slf4j
+public class JdbcOpenGaussIT extends AbstractJdbcIT {
+ protected static final String OPENGAUSS_IMAGE =
"opengauss/opengauss:5.0.0";
+
+ private static final String OPEN_GAUSS_ALIASES = "e2e_OpenGauss";
+ private static final String DRIVER_CLASS = "org.opengauss.Driver";
+ private static final int OPEN_GAUSS_PORT = 5432;
+ private static final String OPEN_GAUSS_URL = "jdbc:opengauss://" + HOST +
":%s/%s";
+ private static final String USERNAME = "gaussdb";
+ private static final String PASSWORD = "openGauss@123";
+ private static final String DATABASE = "postgres";
+ private static final String SCHEMA = "public";
+ private static final String SOURCE_TABLE = "gs_e2e_source_table";
+ private static final String SINK_TABLE = "gs_e2e_sink_table";
+ private static final String CATALOG_TABLE = "e2e_table_catalog";
+ private static final Integer GEN_ROWS = 100;
+ private static final List<String> CONFIG_FILE =
+ Lists.newArrayList("/jdbc_opengauss_source_and_sink.conf");
+
+ private static final String CREATE_SQL =
+ "CREATE TABLE IF NOT EXISTS %s (\n"
+ + " gid SERIAL PRIMARY KEY,\n"
+ + " uuid_col UUID,\n"
+ + " text_col TEXT,\n"
+ + " varchar_col VARCHAR(255),\n"
+ + " char_col CHAR(10),\n"
+ + " boolean_col bool,\n"
+ + " smallint_col int2,\n"
+ + " integer_col int4,\n"
+ + " bigint_col BIGINT,\n"
+ + " decimal_col DECIMAL(10, 2),\n"
+ + " numeric_col NUMERIC(8, 4),\n"
+ + " real_col float4,\n"
+ + " double_precision_col float8,\n"
+ + " smallserial_col SMALLSERIAL,\n"
+ + " bigserial_col BIGSERIAL,\n"
+ + " date_col DATE,\n"
+ + " timestamp_col TIMESTAMP,\n"
+ + " bpchar_col BPCHAR(10),\n"
+ + " age INT NOT null\n"
+ + ");";
+
+ private static final String[] fieldNames =
+ new String[] {
+ "gid",
+ "uuid_col",
+ "text_col",
+ "varchar_col",
+ "char_col",
+ "boolean_col",
+ "smallint_col",
+ "integer_col",
+ "bigint_col",
+ "decimal_col",
+ "numeric_col",
+ "real_col",
+ "double_precision_col",
+ "smallserial_col",
+ "bigserial_col",
+ "date_col",
+ "timestamp_col",
+ "bpchar_col",
+ "age"
+ };
+
+ @TestContainerExtension
+ protected final ContainerExtendedFactory extendedFactory =
+ container -> {
+ Container.ExecResult extraCommands =
+ container.execInContainer(
+ "bash",
+ "-c",
+ "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib &&
cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O "
+ + driverUrl());
+ Assertions.assertEquals(0, extraCommands.getExitCode(),
extraCommands.getStderr());
+ };
+
+ @Test
+ @Override
+ public void testCatalog() {
+ if (catalog == null) {
+ return;
+ }
+ TablePath sourceTablePath =
+ new TablePath(
+ jdbcCase.getDatabase(), jdbcCase.getSchema(),
jdbcCase.getSourceTable());
+ TablePath targetTablePath =
+ new TablePath(
+ jdbcCase.getCatalogDatabase(),
+ jdbcCase.getCatalogSchema(),
+ jdbcCase.getCatalogTable());
+
+ CatalogTable catalogTable = catalog.getTable(sourceTablePath);
+ catalog.createTable(targetTablePath, catalogTable, false);
+ Assertions.assertTrue(catalog.tableExists(targetTablePath));
+
+ catalog.dropTable(targetTablePath, false);
+ Assertions.assertFalse(catalog.tableExists(targetTablePath));
+ }
+
+ @Test
+ public void testCreateIndex() {
+ String schema = "public";
+ String databaseName = jdbcCase.getDatabase();
+ TablePath sourceTablePath = TablePath.of(databaseName, "public",
"gs_e2e_source_table");
+ TablePath targetTablePath = TablePath.of(databaseName, "public",
"gs_ide_sink_table_2");
+ OpenGaussCatalog openGaussCatalog = (OpenGaussCatalog) catalog;
+ CatalogTable catalogTable = openGaussCatalog.getTable(sourceTablePath);
+ dropTableWithAssert(openGaussCatalog, targetTablePath, true);
+ // not create index
+ createIndexOrNot(openGaussCatalog, targetTablePath, catalogTable,
false);
+ Assertions.assertFalse(hasIndex(openGaussCatalog, targetTablePath));
+
+ dropTableWithAssert(openGaussCatalog, targetTablePath, true);
+ // create index
+ createIndexOrNot(openGaussCatalog, targetTablePath, catalogTable,
true);
+ Assertions.assertTrue(hasIndex(openGaussCatalog, targetTablePath));
+
+ dropTableWithAssert(openGaussCatalog, targetTablePath, true);
+ }
+
+ protected boolean hasIndex(Catalog catalog, TablePath targetTablePath) {
+ TableSchema tableSchema =
catalog.getTable(targetTablePath).getTableSchema();
+ PrimaryKey primaryKey = tableSchema.getPrimaryKey();
+ List<ConstraintKey> constraintKeys = tableSchema.getConstraintKeys();
+ if (primaryKey != null &&
StringUtils.isNotBlank(primaryKey.getPrimaryKey())) {
+ return true;
+ }
+ if (!constraintKeys.isEmpty()) {
+ return true;
+ }
+ return false;
+ }
+
+ private void dropTableWithAssert(
+ OpenGaussCatalog openGaussCatalog,
+ TablePath targetTablePath,
+ boolean ignoreIfNotExists) {
+ openGaussCatalog.dropTable(targetTablePath, ignoreIfNotExists);
+ Assertions.assertFalse(openGaussCatalog.tableExists(targetTablePath));
+ }
+
+ private void createIndexOrNot(
+ OpenGaussCatalog openGaussCatalog,
+ TablePath targetTablePath,
+ CatalogTable catalogTable,
+ boolean createIndex) {
+ openGaussCatalog.createTable(targetTablePath, catalogTable, false,
createIndex);
+ Assertions.assertTrue(openGaussCatalog.tableExists(targetTablePath));
+ }
+
+ @Override
+ JdbcCase getJdbcCase() {
+ Map<String, String> containerEnv = new HashMap<>();
+ containerEnv.put("OPEN_GAUSS_PASSWORD", PASSWORD);
+ containerEnv.put("APP_USER", USERNAME);
+ containerEnv.put("APP_USER_PASSWORD", PASSWORD);
+ String jdbcUrl = String.format(OPEN_GAUSS_URL, OPEN_GAUSS_PORT,
DATABASE);
+ Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+ String[] fieldNames = testDataSet.getKey();
+
+ String insertSql = insertTable(SCHEMA, SOURCE_TABLE, fieldNames);
+
+ return JdbcCase.builder()
+ .dockerImage(OPENGAUSS_IMAGE)
+ .networkAliases(OPEN_GAUSS_ALIASES)
+ .containerEnv(containerEnv)
+ .driverClass(DRIVER_CLASS)
+ .host(HOST)
+ .port(OPEN_GAUSS_PORT)
+ .localPort(OPEN_GAUSS_PORT)
+ .jdbcTemplate(OPEN_GAUSS_URL)
+ .jdbcUrl(jdbcUrl)
+ .userName(USERNAME)
+ .password(PASSWORD)
+ .database(DATABASE)
+ .schema(SCHEMA)
+ .sourceTable(SOURCE_TABLE)
+ .sinkTable(SINK_TABLE)
+ .catalogDatabase(DATABASE)
+ .catalogSchema(SCHEMA)
+ .catalogTable(CATALOG_TABLE)
+ .createSql(CREATE_SQL)
+ .configFile(CONFIG_FILE)
+ .insertSql(insertSql)
+ .testData(testDataSet)
+ .build();
+ }
+
+ @Override
+ String driverUrl() {
+ return
"https://repo1.maven.org/maven2/org/opengauss/opengauss-jdbc/5.1.0-og/opengauss-jdbc-5.1.0-og.jar";
+ }
+
+ @Override
+ protected Class<?> loadDriverClass() {
+ return super.loadDriverClassFromUrl();
+ }
+
+ @Override
+ Pair<String[], List<SeaTunnelRow>> initTestData() {
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ for (Integer i = 0; i < GEN_ROWS; i++) {
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ i,
+ UUID.randomUUID(),
+ String.valueOf(i),
+ String.valueOf(i),
+ String.valueOf(i),
+ i % 2 == 0,
+ i,
+ i,
+ Long.valueOf(i),
+ BigDecimal.valueOf(i * 10.0),
+ BigDecimal.valueOf(i * 0.01),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.111"),
+ i,
+ Long.valueOf(i),
+ LocalDate.of(2022, 1, 1).atStartOfDay(),
+ LocalDateTime.of(2022, 1, 1, 10, 0),
+ "Testing",
+ i
+ });
+ rows.add(row);
+ }
+
+ return Pair.of(fieldNames, rows);
+ }
+
+ @Override
+ public String quoteIdentifier(String field) {
+ return "\"" + field + "\"";
+ }
+
+ @Override
+ protected void clearTable(String database, String schema, String table) {
+ clearTable(schema, table);
+ }
+
+ @Override
+ protected String buildTableInfoWithSchema(String database, String schema,
String table) {
+ return buildTableInfoWithSchema(schema, table);
+ }
+
+ @Override
+ GenericContainer<?> initContainer() {
+ GenericContainer<?> container =
+ new GenericContainer<>(OPENGAUSS_IMAGE)
+ .withNetwork(NETWORK)
+ .withNetworkAliases(OPEN_GAUSS_ALIASES)
+ .withEnv("GS_PASSWORD", PASSWORD)
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+
DockerLoggerFactory.getLogger(OPENGAUSS_IMAGE)));
+ container.setPortBindings(
+ Lists.newArrayList(String.format("%s:%s", OPEN_GAUSS_PORT,
OPEN_GAUSS_PORT)));
+
+ return container;
+ }
+
+ @Override
+ protected void initCatalog() {
+ String jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST,
dbServer.getHost());
+ catalog =
+ new OpenGaussCatalog(
+ DatabaseIdentifier.OPENGAUSS,
+ jdbcCase.getUserName(),
+ jdbcCase.getPassword(),
+ JdbcUrlUtil.getUrlInfo(jdbcUrl),
+ SCHEMA);
+ // set connection
+ ((OpenGaussCatalog) catalog).setConnection(jdbcUrl, connection);
+ catalog.open();
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_opengauss_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_opengauss_source_and_sink.conf
new file mode 100644
index 0000000000..224fcb2403
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_opengauss_source_and_sink.conf
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ jdbc {
+ url = "jdbc:opengauss://e2e_OpenGauss:5432/postgres?loggerLevel=OFF"
+ driver = "org.opengauss.Driver"
+ connection_check_timeout_sec = 100
+ user = "gaussdb"
+ password = "openGauss@123"
+ table_path = "postgres.public.gs_e2e_source_table"
+ query = "select * from public.gs_e2e_source_table"
+ split.size = 10
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc {
+ url =
"jdbc:opengauss://e2e_OpenGauss:5432/postgres?loggerLevel=OFF&stringtype=unspecified"
+ driver = "org.opengauss.Driver"
+ user = "gaussdb"
+ password = "openGauss@123"
+ database = "postgres"
+ table = "public.gs_e2e_sink_table"
+ compatible_mode = "postgresLow"
+ generate_sink_sql = true
+ }
+}