This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7ba11cecd [Feature][Connector-V2][Jdbc] Add oceanbase dialect factory
(#4989)
7ba11cecd is described below
commit 7ba11cecdf7ac70c65ff5df501bbce8b499967c9
Author: He Wang <[email protected]>
AuthorDate: Tue Jul 11 17:34:02 2023 +0800
[Feature][Connector-V2][Jdbc] Add oceanbase dialect factory (#4989)
---------
Co-authored-by: silenceland <[email protected]>
Co-authored-by: changhuyan <[email protected]>
---
docs/en/connector-v2/sink/Jdbc.md | 6 +
docs/en/connector-v2/sink/OceanBase.md | 186 +++++++++++++++
docs/en/connector-v2/source/Jdbc.md | 6 +
docs/en/connector-v2/source/OceanBase.md | 168 ++++++++++++++
.../jdbc/config/JdbcConnectionConfig.java | 13 ++
.../seatunnel/jdbc/config/JdbcOptions.java | 6 +
.../seatunnel/jdbc/config/JdbcSourceConfig.java | 2 +
.../jdbc/internal/dialect/JdbcDialectFactory.java | 10 +
.../jdbc/internal/dialect/JdbcDialectLoader.java | 5 +-
.../dialect/oceanbase/OceanBaseDialectFactory.java | 49 ++++
.../connectors/seatunnel/jdbc/sink/JdbcSink.java | 5 +-
.../seatunnel/jdbc/sink/JdbcSinkFactory.java | 9 +-
.../seatunnel/jdbc/source/JdbcSource.java | 4 +-
.../seatunnel/jdbc/source/JdbcSourceFactory.java | 9 +-
.../seatunnel/jdbc/JdbcOceanBaseITBase.java | 147 ++++++++++++
.../seatunnel/jdbc/JdbcOceanBaseMysqlIT.java | 256 +++++++++++++++++++++
.../seatunnel/jdbc/JdbcOceanBaseOracleIT.java | 161 +++++++++++++
.../jdbc_oceanbase_mysql_source_and_sink.conf | 55 +++++
.../jdbc_oceanbase_oracle_source_and_sink.conf | 53 +++++
.../e2e/connector/pulsar/PulsarBatchIT.java | 2 +
20 files changed, 1144 insertions(+), 8 deletions(-)
diff --git a/docs/en/connector-v2/sink/Jdbc.md
b/docs/en/connector-v2/sink/Jdbc.md
index d472d9a33..f128f6b4b 100644
--- a/docs/en/connector-v2/sink/Jdbc.md
+++ b/docs/en/connector-v2/sink/Jdbc.md
@@ -33,6 +33,7 @@ support `Xa transactions`. You can set `is_exactly_once=true`
to enable it.
| user | String | No | -
|
| password | String | No | -
|
| query | String | No | -
|
+| compatible_mode | String | No | -
|
| database | String | No | -
|
| table | String | No | -
|
| primary_keys | Array | No | -
|
@@ -69,6 +70,10 @@ The URL of the JDBC connection. Refer to a case:
jdbc:postgresql://localhost/tes
Use this sql write upstream input datas to database. e.g `INSERT ...`
+### compatible_mode [string]
+
+The compatible mode of database, required when the database supports multiple
compatible modes. For example, when using OceanBase database, you need to set
it to 'mysql' or 'oracle'.
+
### database [string]
Use this `database` and `table-name` auto-generate sql and receive upstream
input datas write to database.
@@ -168,6 +173,7 @@ there are some reference value for params above.
| Redshift | com.amazon.redshift.jdbc42.Driver |
jdbc:redshift://localhost:5439/testdb |
com.amazon.redshift.xa.RedshiftXADataSource |
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42
|
| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver |
jdbc:snowflake://<account_name>.snowflakecomputing.com | /
|
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc
|
| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 | /
|
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
+| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 | /
|
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar
|
## Example
diff --git a/docs/en/connector-v2/sink/OceanBase.md
b/docs/en/connector-v2/sink/OceanBase.md
new file mode 100644
index 000000000..ec87ce3d3
--- /dev/null
+++ b/docs/en/connector-v2/sink/OceanBase.md
@@ -0,0 +1,186 @@
+# OceanBase
+
+> JDBC OceanBase Sink Connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Key Features
+
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [cdc](../../concept/connector-v2-features.md)
+
+## Description
+
+Write data through jdbc. Support Batch mode and Streaming mode, support
concurrent writing, support exactly-once semantics.
+
+## Supported DataSource Info
+
+| Datasource | Supported versions | Driver |
Url | Maven
|
+|------------|--------------------------------|---------------------------|--------------------------------------|-------------------------------------------------------------------------------|
+| OceanBase | All OceanBase server versions. | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2883/test |
[Download](https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client) |
+
+## Database Dependency
+
+> Please download the support list corresponding to 'Maven' and copy it to the
'$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory<br/>
+> For example: cp oceanbase-client-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+### Mysql Mode
+
+| Mysql Data type
|
SeaTunnel Data type
|
+|-----------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
+| BIT(1)<br/>INT UNSIGNED
| BOOLEAN
|
+| TINYINT<br/>TINYINT UNSIGNED<br/>SMALLINT<br/>SMALLINT
UNSIGNED<br/>MEDIUMINT<br/>MEDIUMINT UNSIGNED<br/>INT<br/>INTEGER<br/>YEAR |
INT
|
+| INT UNSIGNED<br/>INTEGER UNSIGNED<br/>BIGINT
| BIGINT
|
+| BIGINT UNSIGNED
| DECIMAL(20,0)
|
+| DECIMAL(x,y)(Get the designated column's specified column size.<38)
| DECIMAL(x,y)
|
+| DECIMAL(x,y)(Get the designated column's specified column size.>38)
| DECIMAL(38,18)
|
+| DECIMAL UNSIGNED
| DECIMAL((Get the
designated column's specified column size)+1,<br/>(Gets the designated column's
number of digits to right of the decimal point.))) |
+| FLOAT<br/>FLOAT UNSIGNED
| FLOAT
|
+| DOUBLE<br/>DOUBLE UNSIGNED
| DOUBLE
|
+| CHAR<br/>VARCHAR<br/>TINYTEXT<br/>MEDIUMTEXT<br/>TEXT<br/>LONGTEXT<br/>JSON
| STRING
|
+| DATE
| DATE
|
+| TIME
| TIME
|
+| DATETIME<br/>TIMESTAMP
| TIMESTAMP
|
+|
TINYBLOB<br/>MEDIUMBLOB<br/>BLOB<br/>LONGBLOB<br/>BINARY<br/>VARBINAR<br/>BIT(n)
| BYTES
|
+| GEOMETRY<br/>UNKNOWN
| Not supported yet
|
+
+### Oracle Mode
+
+| Oracle Data type | SeaTunnel Data
type |
+|-----------------------------------------------------------|---------------------|
+| Number(p), p <= 9 | INT
|
+| Number(p), p <= 18 | BIGINT
|
+| Number(p), p > 18 | DECIMAL(38,18)
|
+| REAL<br/> BINARY_FLOAT | FLOAT
|
+| BINARY_DOUBLE | DOUBLE
|
+| CHAR<br/>NCHAR<br/>NVARCHAR2<br/>NCLOB<br/>CLOB<br/>ROWID | STRING
|
+| DATE | DATE
|
+| TIMESTAMP<br/>TIMESTAMP WITH LOCAL TIME ZONE | TIMESTAMP
|
+| BLOB<br/>RAW<br/>LONG RAW<br/>BFILE | BYTES
|
+| UNKNOWN | Not supported
yet |
+
+## Sink Options
+
+| Name | Type | Required | Default |
Description
|
+|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url | String | Yes | - |
The URL of the JDBC connection. Refer to a case:
jdbc:oceanbase://localhost:2883/test
|
+| driver | String | Yes | - |
The jdbc class name used to connect to the remote data source, should be
`com.oceanbase.jdbc.Driver`.
|
+| user | String | No | - |
Connection instance user name
|
+| password | String | No | - |
Connection instance password
|
+| query | String | No | - |
Use this sql write upstream input datas to database. e.g `INSERT ...`,`query`
have the higher priority
|
+| compatible_mode | String | Yes | - |
The compatible mode of OceanBase, can be 'mysql' or 'oracle'.
|
+| database | String | No | - |
Use this `database` and `table-name` auto-generate sql and receive upstream
input datas write to database.<br/>This option is mutually exclusive with
`query` and has a higher priority.
|
+| table | String | No | - |
Use database and this table-name auto-generate sql and receive upstream input
datas write to database.<br/>This option is mutually exclusive with `query` and
has a higher priority. |
+| primary_keys | Array | No | - |
This option is used to support operations such as `insert`, `delete`, and
`update` when automatically generate sql.
|
+| support_upsert_by_query_primary_key_exist | Boolean | No | false |
Choose to use INSERT sql, UPDATE sql to process update events(INSERT,
UPDATE_AFTER) based on query primary key exists. This configuration is only
used when database unsupport upsert syntax. **Note**: that this method has low
performance |
+| connection_check_timeout_sec | Int | No | 30 |
The time in seconds to wait for the database operation used to validate the
connection to complete.
|
+| max_retries | Int | No | 0 |
The number of retries to submit failed (executeBatch)
|
+| batch_size | Int | No | 1000 |
For batch writing, when the number of buffered records reaches the number of
`batch_size` or the time reaches `batch_interval_ms`<br/>, the data will be
flushed into the database
|
+| batch_interval_ms | Int | No | 1000 |
For batch writing, when the number of buffers reaches the number of
`batch_size` or the time reaches `batch_interval_ms`, the data will be flushed
into the database
|
+| generate_sink_sql | Boolean | No | false |
Generate sql statements based on the database table you want to write to
|
+| max_commit_attempts | Int | No | 3 |
The number of retries for transaction commit failures
|
+| transaction_timeout_sec | Int | No | -1 |
The timeout after the transaction is opened, the default is -1 (never timeout).
Note that setting the timeout may affect<br/>exactly-once semantics
|
+| auto_commit | Boolean | No | true |
Automatic transaction commit is enabled by default
|
+| common-options | | no | - |
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
|
+
+### Tips
+
+> If partition_column is not set, it will run in single concurrency, and if
partition_column is set, it will be executed in parallel according to the
concurrency of tasks.
+
+## Task Example
+
+### Simple:
+
+> This example defines a SeaTunnel synchronization task that automatically
generates data through FakeSource and sends it to JDBC Sink. FakeSource
generates a total of 16 rows of data (row.num=16), with each row having two
fields, name (string type) and age (int type). The final target table is
test_table will also be 16 rows of data in the table. Before run this job, you
need create database test and table test_table in your mysql. And if you have
not yet installed and deployed SeaTunne [...]
+
+```
+# Defining the runtime environment
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ parallelism = 1
+ result_table_name = "fake"
+ row.num = 16
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+transform {
+ # If you would like to get more information about how to configure seatunnel
and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/category/transform-v2
+}
+
+sink {
+ jdbc {
+ url = "jdbc:oceanbase://localhost:2883/test"
+ driver = "com.oceanbase.jdbc.Driver"
+ user = "root"
+ password = "123456"
+ compatible_mode = "mysql"
+ query = "insert into test_table(name,age) values(?,?)"
+ }
+ # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}
+```
+
+### Generate Sink SQL
+
+> This example not need to write complex sql statements, you can configure
the database name table name to automatically generate add statements for you
+
+```
+sink {
+ jdbc {
+ url = "jdbc:oceanbase://localhost:2883/test"
+ driver = "com.oceanbase.jdbc.Driver"
+ user = "root"
+ password = "123456"
+ compatible_mode = "mysql"
+ # Automatically generate sql statements based on database table names
+ generate_sink_sql = true
+ database = test
+ table = test_table
+ }
+}
+```
+
+### CDC(Change Data Capture) Event
+
+> CDC change data is also supported by us In this case, you need config
database, table and primary_keys.
+
+```
+sink {
+ jdbc {
+ url = "jdbc:oceanbase://localhost:3306/test"
+ driver = "com.oceanbase.jdbc.Driver"
+ user = "root"
+ password = "123456"
+ compatible_mode = "mysql"
+ generate_sink_sql = true
+ # You need to configure both database and table
+ database = test
+ table = sink_table
+ primary_keys = ["id","name"]
+ }
+}
+```
+
diff --git a/docs/en/connector-v2/source/Jdbc.md
b/docs/en/connector-v2/source/Jdbc.md
index 528114754..a324316e5 100644
--- a/docs/en/connector-v2/source/Jdbc.md
+++ b/docs/en/connector-v2/source/Jdbc.md
@@ -35,6 +35,7 @@ supports query SQL and can achieve projection effect.
| user | String | No | - |
| password | String | No | - |
| query | String | Yes | - |
+| compatible_mode | String | No | - |
| connection_check_timeout_sec | Int | No | 30 |
| partition_column | String | No | - |
| partition_upper_bound | Long | No | - |
@@ -63,6 +64,10 @@ The URL of the JDBC connection. Refer to a case:
jdbc:postgresql://localhost/tes
Query statement
+### compatible_mode [string]
+
+The compatible mode of database, required when the database supports multiple
compatible modes. For example, when using OceanBase database, you need to set
it to 'mysql' or 'oracle'.
+
### connection_check_timeout_sec [int]
The time in seconds to wait for the database operation used to validate the
connection to complete.
@@ -120,6 +125,7 @@ there are some reference value for params above.
| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver |
jdbc:snowflake://<account_name>.snowflakecomputing.com |
https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc
|
| Redshift | com.amazon.redshift.jdbc42.Driver |
jdbc:redshift://localhost:5439/testdb?defaultRowFetchSize=1000 |
https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42
|
| Vertica | com.vertica.jdbc.Driver |
jdbc:vertica://localhost:5433 |
https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar
|
+| OceanBase | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2881 |
https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar
|
## Example
diff --git a/docs/en/connector-v2/source/OceanBase.md
b/docs/en/connector-v2/source/OceanBase.md
new file mode 100644
index 000000000..9625ef4fb
--- /dev/null
+++ b/docs/en/connector-v2/source/OceanBase.md
@@ -0,0 +1,168 @@
+# OceanBase
+
+> JDBC OceanBase Source Connector
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Key Features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [x] [support user-defined split](../../concept/connector-v2-features.md)
+
+## Description
+
+Read external data source data through JDBC.
+
+## Supported DataSource Info
+
+| Datasource | Supported versions | Driver |
Url | Maven
|
+|------------|--------------------------------|---------------------------|--------------------------------------|-------------------------------------------------------------------------------|
+| OceanBase | All OceanBase server versions. | com.oceanbase.jdbc.Driver |
jdbc:oceanbase://localhost:2883/test |
[Download](https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client) |
+
+## Database Dependency
+
+> Please download the support list corresponding to 'Maven' and copy it to the
'$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory<br/>
+> For example: cp oceanbase-client-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/
+
+## Data Type Mapping
+
+### Mysql Mode
+
+| Mysql Data type
|
SeaTunnel Data type
|
+|-----------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------|
+| BIT(1)<br/>INT UNSIGNED
| BOOLEAN
|
+| TINYINT<br/>TINYINT UNSIGNED<br/>SMALLINT<br/>SMALLINT
UNSIGNED<br/>MEDIUMINT<br/>MEDIUMINT UNSIGNED<br/>INT<br/>INTEGER<br/>YEAR |
INT
|
+| INT UNSIGNED<br/>INTEGER UNSIGNED<br/>BIGINT
| BIGINT
|
+| BIGINT UNSIGNED
| DECIMAL(20,0)
|
+| DECIMAL(x,y)(Get the designated column's specified column size.<38)
| DECIMAL(x,y)
|
+| DECIMAL(x,y)(Get the designated column's specified column size.>38)
| DECIMAL(38,18)
|
+| DECIMAL UNSIGNED
| DECIMAL((Get the
designated column's specified column size)+1,<br/>(Gets the designated column's
number of digits to right of the decimal point.))) |
+| FLOAT<br/>FLOAT UNSIGNED
| FLOAT
|
+| DOUBLE<br/>DOUBLE UNSIGNED
| DOUBLE
|
+| CHAR<br/>VARCHAR<br/>TINYTEXT<br/>MEDIUMTEXT<br/>TEXT<br/>LONGTEXT<br/>JSON
| STRING
|
+| DATE
| DATE
|
+| TIME
| TIME
|
+| DATETIME<br/>TIMESTAMP
| TIMESTAMP
|
+|
TINYBLOB<br/>MEDIUMBLOB<br/>BLOB<br/>LONGBLOB<br/>BINARY<br/>VARBINAR<br/>BIT(n)
| BYTES
|
+| GEOMETRY<br/>UNKNOWN
| Not supported yet
|
+
+### Oracle Mode
+
+| Oracle Data type | SeaTunnel Data
type |
+|-----------------------------------------------------------|---------------------|
+| Number(p), p <= 9 | INT
|
+| Number(p), p <= 18 | BIGINT
|
+| Number(p), p > 18 | DECIMAL(38,18)
|
+| REAL<br/> BINARY_FLOAT | FLOAT
|
+| BINARY_DOUBLE | DOUBLE
|
+| CHAR<br/>NCHAR<br/>NVARCHAR2<br/>NCLOB<br/>CLOB<br/>ROWID | STRING
|
+| DATE | DATE
|
+| TIMESTAMP<br/>TIMESTAMP WITH LOCAL TIME ZONE | TIMESTAMP
|
+| BLOB<br/>RAW<br/>LONG RAW<br/>BFILE | BYTES
|
+| UNKNOWN | Not supported
yet |
+
+## Source Options
+
+| Name | Type | Required | Default |
Description
|
+|------------------------------|--------|----------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| url | String | Yes | - | The URL
of the JDBC connection. Refer to a case: jdbc:oceanbase://localhost:2883/test
|
+| driver | String | Yes | - | The
jdbc class name used to connect to the remote data source, should be
`com.oceanbase.jdbc.Driver`.
|
+| user | String | No | - |
Connection instance user name
|
+| password | String | No | - |
Connection instance password
|
+| compatible_mode | String | Yes | - | The
compatible mode of OceanBase, can be 'mysql' or 'oracle'.
|
+| query | String | Yes | - | Query
statement
|
+| connection_check_timeout_sec | Int | No | 30 | The
time in seconds to wait for the database operation used to validate the
connection to complete
|
+| partition_column | String | No | - | The
column name for parallelism's partition, only support numeric type column and
string type column.
|
+| partition_lower_bound | Long | No | - | The
partition_column min value for scan, if not set SeaTunnel will query database
get min value.
|
+| partition_upper_bound | Long | No | - | The
partition_column max value for scan, if not set SeaTunnel will query database
get max value.
|
+| partition_num | Int | No | job parallelism | The
number of partition count, only support positive integer. Default value is job
parallelism.
|
+| fetch_size | Int | No | 0 | For
queries that return a large number of objects, you can configure <br/> the row
fetch size used in the query to improve performance by <br/> reducing the
number database hits required to satisfy the selection criteria.<br/> Zero
means use jdbc default value. |
+| common-options | | No | - | Source
plugin common parameters, please refer to [Source Common
Options](common-options.md) for details
|
+
+### Tips
+
+> If partition_column is not set, it will run in single concurrency, and if
partition_column is set, it will be executed in parallel according to the
concurrency of tasks.
+
+## Task Example
+
+### Simple:
+
+```
+env {
+ execution.parallelism = 2
+ job.mode = "BATCH"
+}
+
+source {
+ Jdbc {
+ driver = "com.oceanbase.jdbc.Driver"
+ url =
"jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
+ user = "root"
+ password = ""
+ compatible_mode = "mysql"
+ query = "select * from source"
+ }
+}
+
+transform {
+ # If you would like to get more information about how to configure
seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ Console {}
+}
+```
+
+### Parallel:
+
+> Read your query table in parallel with the shard field you configured and
the shard data. You can do this if you want to read the whole table
+
+```
+source {
+ Jdbc {
+ driver = "com.oceanbase.jdbc.Driver"
+ url =
"jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
+ user = "root"
+ password = ""
+ compatible_mode = "mysql"
+ query = "select * from source"
+ # Parallel sharding reads fields
+ partition_column = "id"
+ # Number of fragments
+ partition_num = 10
+ }
+}
+```
+
+### Parallel Boundary:
+
+> It is more efficient to read your data source according to the upper and
lower boundaries you configured
+
+```
+source {
+ Jdbc {
+ driver = "com.oceanbase.jdbc.Driver"
+ url =
"jdbc:oceanbase://localhost:2883/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"
+ user = "root"
+ password = ""
+ compatible_mode = "mysql"
+ query = "select * from source"
+ partition_column = "id"
+ partition_num = 10
+ # Read start boundary
+ partition_lower_bound = 1
+ # Read end boundary
+ partition_upper_bound = 500
+ }
+}
+```
+
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
index afceddc59..6e2147c03 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcConnectionConfig.java
@@ -27,6 +27,7 @@ public class JdbcConnectionConfig implements Serializable {
public String url;
public String driverName;
+ public String compatibleMode;
public int connectionCheckTimeoutSeconds =
JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue();
public int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue();
@@ -48,6 +49,7 @@ public class JdbcConnectionConfig implements Serializable {
public static JdbcConnectionConfig of(ReadonlyConfig config) {
JdbcConnectionConfig.Builder builder = JdbcConnectionConfig.builder();
builder.url(config.get(JdbcOptions.URL));
+ builder.compatibleMode(config.get(JdbcOptions.COMPATIBLE_MODE));
builder.driverName(config.get(JdbcOptions.DRIVER));
builder.autoCommit(config.get(JdbcOptions.AUTO_COMMIT));
builder.maxRetries(config.get(JdbcOptions.MAX_RETRIES));
@@ -74,6 +76,10 @@ public class JdbcConnectionConfig implements Serializable {
return driverName;
}
+ public String getCompatibleMode() {
+ return compatibleMode;
+ }
+
public boolean isAutoCommit() {
return autoCommit;
}
@@ -121,6 +127,7 @@ public class JdbcConnectionConfig implements Serializable {
public static final class Builder {
private String url;
private String driverName;
+ private String compatibleMode;
private int connectionCheckTimeoutSeconds =
JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC.defaultValue();
private int maxRetries = JdbcOptions.MAX_RETRIES.defaultValue();
@@ -146,6 +153,11 @@ public class JdbcConnectionConfig implements Serializable {
return this;
}
+ public Builder compatibleMode(String compatibleMode) {
+ this.compatibleMode = compatibleMode;
+ return this;
+ }
+
public Builder connectionCheckTimeoutSeconds(int
connectionCheckTimeoutSeconds) {
this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
return this;
@@ -206,6 +218,7 @@ public class JdbcConnectionConfig implements Serializable {
jdbcConnectionConfig.batchSize = this.batchSize;
jdbcConnectionConfig.batchIntervalMs = this.batchIntervalMs;
jdbcConnectionConfig.driverName = this.driverName;
+ jdbcConnectionConfig.compatibleMode = this.compatibleMode;
jdbcConnectionConfig.maxRetries = this.maxRetries;
jdbcConnectionConfig.password = this.password;
jdbcConnectionConfig.connectionCheckTimeoutSeconds =
this.connectionCheckTimeoutSeconds;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
index 87b2a7b46..24ae0580f 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcOptions.java
@@ -36,6 +36,12 @@ public interface JdbcOptions {
.intType()
.defaultValue(30)
.withDescription("connection check time second");
+ Option<String> COMPATIBLE_MODE =
+ Options.key("compatible_mode")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The compatible mode of database, required when
the database supports multiple compatible modes. For example, when using
OceanBase database, you need to set it to 'mysql' or 'oracle'.");
Option<Integer> MAX_RETRIES =
Options.key("max_retries").intType().defaultValue(0).withDescription("max_retired");
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
index 4c6221549..00130b32a 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/config/JdbcSourceConfig.java
@@ -33,6 +33,7 @@ public class JdbcSourceConfig implements Serializable {
private JdbcConnectionConfig jdbcConnectionConfig;
public String query;
+ public String compatibleMode;
private String partitionColumn;
private BigDecimal partitionUpperBound;
private BigDecimal partitionLowerBound;
@@ -44,6 +45,7 @@ public class JdbcSourceConfig implements Serializable {
builder.jdbcConnectionConfig(JdbcConnectionConfig.of(config));
builder.query(config.get(JdbcOptions.QUERY));
builder.fetchSize(config.get(JdbcOptions.FETCH_SIZE));
+
config.getOptional(JdbcOptions.COMPATIBLE_MODE).ifPresent(builder::compatibleMode);
config.getOptional(JdbcOptions.PARTITION_COLUMN).ifPresent(builder::partitionColumn);
config.getOptional(JdbcOptions.PARTITION_UPPER_BOUND)
.ifPresent(builder::partitionUpperBound);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
index 5e5ae1b55..3d66de659 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectFactory.java
@@ -37,4 +37,14 @@ public interface JdbcDialectFactory {
/** @return Creates a new instance of the {@link JdbcDialect}. */
JdbcDialect create();
+
+ /**
+ * Create a {@link JdbcDialect} instance based on the driver type and
compatible mode.
+ *
+ * @param compatibleMode The compatible mode
+ * @return a new instance of {@link JdbcDialect}
+ */
+ default JdbcDialect create(String compatibleMode) {
+ return create();
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
index 076a6734b..b49df35ff 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialectLoader.java
@@ -40,11 +40,12 @@ public final class JdbcDialectLoader {
* Loads the unique JDBC Dialect that can handle the given database url.
*
* @param url A database URL.
+ * @param compatibleMode The compatible mode.
* @throws IllegalStateException if the loader cannot find exactly one
dialect that can
* unambiguously process the given database URL.
* @return The loaded dialect.
*/
- public static JdbcDialect load(String url) {
+ public static JdbcDialect load(String url, String compatibleMode) {
ClassLoader cl = Thread.currentThread().getContextClassLoader();
List<JdbcDialectFactory> foundFactories = discoverFactories(cl);
@@ -89,7 +90,7 @@ public final class JdbcDialectLoader {
.collect(Collectors.joining("\n"))));
}
- return matchingFactories.get(0).create();
+ return matchingFactories.get(0).create(compatibleMode);
}
private static List<JdbcDialectFactory> discoverFactories(ClassLoader
classLoader) {
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
new file mode 100644
index 000000000..66df84205
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/oceanbase/OceanBaseDialectFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oceanbase;
+
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MysqlDialect;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialect;
+
+import com.google.auto.service.AutoService;
+
+import javax.annotation.Nonnull;
+
+@AutoService(JdbcDialectFactory.class)
+public class OceanBaseDialectFactory implements JdbcDialectFactory {
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:oceanbase:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ throw new UnsupportedOperationException(
+ "Can't create JdbcDialect without compatible mode for
OceanBase");
+ }
+
+ @Override
+ public JdbcDialect create(@Nonnull String compatibleMode) {
+ if ("oracle".equalsIgnoreCase(compatibleMode)) {
+ return new OracleDialect();
+ }
+ return new MysqlDialect();
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 4221172b1..4666eae1e 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -104,7 +104,10 @@ public class JdbcSink
public void prepare(Config pluginConfig) throws PrepareFailException {
this.config = ReadonlyConfig.fromConfig(pluginConfig);
this.jdbcSinkConfig = JdbcSinkConfig.of(config);
- this.dialect =
JdbcDialectLoader.load(jdbcSinkConfig.getJdbcConnectionConfig().getUrl());
+ this.dialect =
+ JdbcDialectLoader.load(
+ jdbcSinkConfig.getJdbcConnectionConfig().getUrl(),
+
jdbcSinkConfig.getJdbcConnectionConfig().getCompatibleMode());
this.dataSaveMode = DataSaveMode.KEEP_SCHEMA_AND_DATA;
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index ae2e49b1e..a9bb1c155 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -41,6 +41,7 @@ import java.util.Optional;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.AUTO_COMMIT;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.BATCH_INTERVAL_MS;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.BATCH_SIZE;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.COMPATIBLE_MODE;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DATABASE;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DRIVER;
@@ -82,7 +83,10 @@ public class JdbcSinkFactory implements TableSinkFactory {
}
final ReadonlyConfig options = config;
JdbcSinkConfig sinkConfig = JdbcSinkConfig.of(config);
- JdbcDialect dialect =
JdbcDialectLoader.load(sinkConfig.getJdbcConnectionConfig().getUrl());
+ JdbcDialect dialect =
+ JdbcDialectLoader.load(
+ sinkConfig.getJdbcConnectionConfig().getUrl(),
+
sinkConfig.getJdbcConnectionConfig().getCompatibleMode());
return () ->
new JdbcSink(
options,
@@ -106,7 +110,8 @@ public class JdbcSinkFactory implements TableSinkFactory {
GENERATE_SINK_SQL,
AUTO_COMMIT,
SUPPORT_UPSERT_BY_QUERY_PRIMARY_KEY_EXIST,
- PRIMARY_KEYS)
+ PRIMARY_KEYS,
+ COMPATIBLE_MODE)
.conditional(
IS_EXACTLY_ONCE,
true,
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
index 39deac1ef..732892b21 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSource.java
@@ -99,7 +99,9 @@ public class JdbcSource
new
SimpleJdbcConnectionProvider(jdbcSourceConfig.getJdbcConnectionConfig());
this.query = jdbcSourceConfig.getQuery();
this.jdbcDialect =
-
JdbcDialectLoader.load(jdbcSourceConfig.getJdbcConnectionConfig().getUrl());
+ JdbcDialectLoader.load(
+ jdbcSourceConfig.getJdbcConnectionConfig().getUrl(),
+
jdbcSourceConfig.getJdbcConnectionConfig().getCompatibleMode());
try (Connection connection =
jdbcConnectionProvider.getOrEstablishConnection()) {
this.typeInfo = initTableField(connection);
this.partitionParameter =
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
index 8f9605182..43aa1c03d 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
@@ -54,6 +54,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.COMPATIBLE_MODE;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.CONNECTION_CHECK_TIMEOUT_SEC;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.DRIVER;
import static
org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions.FETCH_SIZE;
@@ -83,7 +84,10 @@ public class JdbcSourceFactory implements TableSourceFactory
{
JdbcConnectionProvider connectionProvider =
new
SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig());
final String querySql = config.getQuery();
- JdbcDialect dialect =
JdbcDialectLoader.load(config.getJdbcConnectionConfig().getUrl());
+ JdbcDialect dialect =
+ JdbcDialectLoader.load(
+ config.getJdbcConnectionConfig().getUrl(),
+ config.getJdbcConnectionConfig().getCompatibleMode());
TableSchema tableSchema = catalogTable.getTableSchema();
SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
Optional<PartitionParameter> partitionParameter =
@@ -228,7 +232,8 @@ public class JdbcSourceFactory implements
TableSourceFactory {
PARTITION_COLUMN,
PARTITION_UPPER_BOUND,
PARTITION_LOWER_BOUND,
- PARTITION_NUM)
+ PARTITION_NUM,
+ COMPATIBLE_MODE)
.build();
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
new file mode 100644
index 000000000..b8202e697
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.Assertions;
+import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public abstract class JdbcOceanBaseITBase extends AbstractJdbcIT {
+
+ private static final String OCEANBASE_DATABASE = "seatunnel";
+ private static final String OCEANBASE_SOURCE = "source";
+ private static final String OCEANBASE_SINK = "sink";
+
+ private static final String OCEANBASE_JDBC_TEMPLATE = "jdbc:oceanbase://"
+ HOST + ":%s";
+ private static final String OCEANBASE_DRIVER_CLASS =
"com.oceanbase.jdbc.Driver";
+
+ abstract String imageName();
+
+ abstract String host();
+
+ abstract int port();
+
+ abstract String username();
+
+ abstract String password();
+
+ abstract List<String> configFile();
+
+ abstract String createSqlTemplate();
+
+ abstract String[] getFieldNames();
+
+ @Override
+ JdbcCase getJdbcCase() {
+ Map<String, String> containerEnv = new HashMap<>();
+ String jdbcUrl = String.format(OCEANBASE_JDBC_TEMPLATE, port());
+ Pair<String[], List<SeaTunnelRow>> testDataSet = initTestData();
+ String[] fieldNames = testDataSet.getKey();
+
+ String insertSql = insertTable(OCEANBASE_DATABASE, OCEANBASE_SOURCE,
fieldNames);
+
+ return JdbcCase.builder()
+ .dockerImage(imageName())
+ .networkAliases(host())
+ .containerEnv(containerEnv)
+ .driverClass(OCEANBASE_DRIVER_CLASS)
+ .host(HOST)
+ .port(port())
+ .localPort(port())
+ .jdbcTemplate(OCEANBASE_JDBC_TEMPLATE)
+ .jdbcUrl(jdbcUrl)
+ .userName(username())
+ .password(password())
+ .database(OCEANBASE_DATABASE)
+ .sourceTable(OCEANBASE_SOURCE)
+ .sinkTable(OCEANBASE_SINK)
+ .createSql(createSqlTemplate())
+ .configFile(configFile())
+ .insertSql(insertSql)
+ .testData(testDataSet)
+ .build();
+ }
+
+ @Override
+ void compareResult() {
+ String sourceSql =
+ String.format(
+ "select * from %s.%s order by 1", OCEANBASE_DATABASE,
OCEANBASE_SOURCE);
+ String sinkSql =
+ String.format("select * from %s.%s order by 1",
OCEANBASE_DATABASE, OCEANBASE_SINK);
+ try {
+ Statement sourceStatement = connection.createStatement();
+ Statement sinkStatement = connection.createStatement();
+ ResultSet sourceResultSet =
sourceStatement.executeQuery(sourceSql);
+ ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+ Assertions.assertEquals(
+ sourceResultSet.getMetaData().getColumnCount(),
+ sinkResultSet.getMetaData().getColumnCount());
+ while (sourceResultSet.next()) {
+ if (sinkResultSet.next()) {
+ for (String column : getFieldNames()) {
+ Object source = sourceResultSet.getObject(column);
+ Object sink = sinkResultSet.getObject(column);
+ if (!Objects.deepEquals(source, sink)) {
+ InputStream sourceAsciiStream =
sourceResultSet.getBinaryStream(column);
+ InputStream sinkAsciiStream =
sinkResultSet.getBinaryStream(column);
+ String sourceValue =
+ IOUtils.toString(sourceAsciiStream,
StandardCharsets.UTF_8);
+ String sinkValue =
+ IOUtils.toString(sinkAsciiStream,
StandardCharsets.UTF_8);
+ Assertions.assertEquals(sourceValue, sinkValue);
+ }
+ }
+ }
+ }
+ sourceResultSet.last();
+ sinkResultSet.last();
+ } catch (Exception e) {
+ throw new RuntimeException("Compare result error", e);
+ }
+ }
+
+ @Override
+ String driverUrl() {
+ return
"https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar";
+ }
+
+ @Override
+ protected void createSchemaIfNeeded() {
+ String sql = "CREATE DATABASE IF NOT EXISTS " + OCEANBASE_DATABASE;
+ try {
+ connection.prepareStatement(sql).executeUpdate();
+ } catch (Exception e) {
+ throw new SeaTunnelRuntimeException(
+ JdbcITErrorCode.CREATE_TABLE_FAILED, "Fail to execute sql
" + sql, e);
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
new file mode 100644
index 000000000..548fecaee
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseMysqlIT.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.Disabled;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerLoggerFactory;
+
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+
+@Disabled("Disabled due to insufficient hardware resources in the CI
environment")
+public class JdbcOceanBaseMysqlIT extends JdbcOceanBaseITBase {
+
+ @Override
+ String imageName() {
+ return "oceanbase/oceanbase-ce:4.0.0.0";
+ }
+
+ @Override
+ String host() {
+ return "e2e_oceanbase_mysql";
+ }
+
+ @Override
+ int port() {
+ return 2881;
+ }
+
+ @Override
+ String username() {
+ return "root";
+ }
+
+ @Override
+ String password() {
+ return "";
+ }
+
+ @Override
+ List<String> configFile() {
+ return
Lists.newArrayList("/jdbc_oceanbase_mysql_source_and_sink.conf");
+ }
+
+ @Override
+ String createSqlTemplate() {
+ return "CREATE TABLE IF NOT EXISTS %s\n"
+ + "(\n"
+ + " `c_bit_1` bit(1) DEFAULT
NULL,\n"
+ + " `c_bit_8` bit(8) DEFAULT
NULL,\n"
+ + " `c_bit_16` bit(16) DEFAULT
NULL,\n"
+ + " `c_bit_32` bit(32) DEFAULT
NULL,\n"
+ + " `c_bit_64` bit(64) DEFAULT
NULL,\n"
+ + " `c_boolean` tinyint(1) DEFAULT
NULL,\n"
+ + " `c_tinyint` tinyint(4) DEFAULT
NULL,\n"
+ + " `c_tinyint_unsigned` tinyint(3) unsigned DEFAULT
NULL,\n"
+ + " `c_smallint` smallint(6) DEFAULT
NULL,\n"
+ + " `c_smallint_unsigned` smallint(5) unsigned DEFAULT
NULL,\n"
+ + " `c_mediumint` mediumint(9) DEFAULT
NULL,\n"
+ + " `c_mediumint_unsigned` mediumint(8) unsigned DEFAULT
NULL,\n"
+ + " `c_int` int(11) DEFAULT
NULL,\n"
+ + " `c_integer` int(11) DEFAULT
NULL,\n"
+ + " `c_bigint` bigint(20) DEFAULT
NULL,\n"
+ + " `c_bigint_unsigned` bigint(20) unsigned DEFAULT
NULL,\n"
+ + " `c_decimal` decimal(20, 0) DEFAULT
NULL,\n"
+ + " `c_decimal_unsigned` decimal(38, 18) DEFAULT
NULL,\n"
+ + " `c_float` float DEFAULT
NULL,\n"
+ + " `c_float_unsigned` float unsigned DEFAULT
NULL,\n"
+ + " `c_double` double DEFAULT
NULL,\n"
+ + " `c_double_unsigned` double unsigned DEFAULT
NULL,\n"
+ + " `c_char` char(1) DEFAULT
NULL,\n"
+ + " `c_tinytext` tinytext,\n"
+ + " `c_mediumtext` mediumtext,\n"
+ + " `c_text` text,\n"
+ + " `c_varchar` varchar(255) DEFAULT
NULL,\n"
+ + " `c_json` json DEFAULT
NULL,\n"
+ + " `c_longtext` longtext,\n"
+ + " `c_date` date DEFAULT
NULL,\n"
+ + " `c_datetime` datetime DEFAULT
NULL,\n"
+ + " `c_timestamp` timestamp NULL DEFAULT
NULL,\n"
+ + " `c_tinyblob` tinyblob,\n"
+ + " `c_mediumblob` mediumblob,\n"
+ + " `c_blob` blob,\n"
+ + " `c_longblob` longblob,\n"
+ + " `c_varbinary` varbinary(255) DEFAULT
NULL,\n"
+ + " `c_binary` binary(1) DEFAULT
NULL,\n"
+ + " `c_year` year(4) DEFAULT
NULL,\n"
+ + " `c_int_unsigned` int(10) unsigned DEFAULT
NULL,\n"
+ + " `c_integer_unsigned` int(10) unsigned DEFAULT
NULL,\n"
+ + " `c_bigint_30` BIGINT(40) unsigned DEFAULT
NULL,\n"
+ + " `c_decimal_unsigned_30` DECIMAL(30) unsigned DEFAULT
NULL,\n"
+ + " `c_decimal_30` DECIMAL(30) DEFAULT
NULL\n"
+ + ");";
+ }
+
+ @Override
+ String[] getFieldNames() {
+ return new String[] {
+ "c_bit_1",
+ "c_bit_8",
+ "c_bit_16",
+ "c_bit_32",
+ "c_bit_64",
+ "c_boolean",
+ "c_tinyint",
+ "c_tinyint_unsigned",
+ "c_smallint",
+ "c_smallint_unsigned",
+ "c_mediumint",
+ "c_mediumint_unsigned",
+ "c_int",
+ "c_integer",
+ "c_year",
+ "c_int_unsigned",
+ "c_integer_unsigned",
+ "c_bigint",
+ "c_bigint_unsigned",
+ "c_decimal",
+ "c_decimal_unsigned",
+ "c_float",
+ "c_float_unsigned",
+ "c_double",
+ "c_double_unsigned",
+ "c_char",
+ "c_tinytext",
+ "c_mediumtext",
+ "c_text",
+ "c_varchar",
+ "c_json",
+ "c_longtext",
+ "c_date",
+ "c_datetime",
+ "c_timestamp",
+ "c_tinyblob",
+ "c_mediumblob",
+ "c_blob",
+ "c_longblob",
+ "c_varbinary",
+ "c_binary",
+ "c_bigint_30",
+ "c_decimal_unsigned_30",
+ "c_decimal_30",
+ };
+ }
+
+ @Override
+ Pair<String[], List<SeaTunnelRow>> initTestData() {
+ String[] fieldNames = getFieldNames();
+
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ BigDecimal bigintValue = new BigDecimal("2844674407371055000");
+ BigDecimal decimalValue = new
BigDecimal("999999999999999999999999999899");
+ for (int i = 0; i < 100; i++) {
+ byte byteArr = Integer.valueOf(i).byteValue();
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ i % 2 == 0 ? (byte) 1 : (byte) 0,
+ new byte[] {byteArr},
+ new byte[] {byteArr, byteArr},
+ new byte[] {byteArr, byteArr, byteArr,
byteArr},
+ new byte[] {
+ byteArr, byteArr, byteArr, byteArr,
byteArr, byteArr, byteArr,
+ byteArr
+ },
+ i % 2 == 0 ? Boolean.TRUE : Boolean.FALSE,
+ i,
+ i,
+ i,
+ i,
+ i,
+ i,
+ i,
+ i,
+ i,
+ Long.parseLong("1"),
+ Long.parseLong("1"),
+ Long.parseLong("1"),
+ BigDecimal.valueOf(i, 0),
+ BigDecimal.valueOf(i, 18),
+ BigDecimal.valueOf(i, 18),
+ Float.parseFloat("1.1"),
+ Float.parseFloat("1.1"),
+ Double.parseDouble("1.1"),
+ Double.parseDouble("1.1"),
+ "f",
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ String.format("f1_%s", i),
+ String.format("{\"aa\":\"bb_%s\"}", i),
+ String.format("f1_%s", i),
+ Date.valueOf(LocalDate.now()),
+ Timestamp.valueOf(LocalDateTime.now()),
+ new Timestamp(System.currentTimeMillis()),
+ "test".getBytes(),
+ "test".getBytes(),
+ "test".getBytes(),
+ "test".getBytes(),
+ "test".getBytes(),
+ "f".getBytes(),
+ bigintValue.add(BigDecimal.valueOf(i)),
+ decimalValue.add(BigDecimal.valueOf(i)),
+ decimalValue.add(BigDecimal.valueOf(i)),
+ });
+ rows.add(row);
+ }
+
+ return Pair.of(fieldNames, rows);
+ }
+
+ @Override
+ GenericContainer<?> initContainer() {
+ GenericContainer<?> container =
+ new GenericContainer<>(imageName())
+ .withNetwork(NETWORK)
+ .withNetworkAliases(host())
+ .waitingFor(Wait.forLogMessage(".*boot success!.*", 1))
+ .withStartupTimeout(Duration.ofMinutes(5))
+ .withLogConsumer(
+ new
Slf4jLogConsumer(DockerLoggerFactory.getLogger(imageName())));
+
+ container.setPortBindings(Lists.newArrayList(String.format("%s:%s",
port(), port())));
+
+ return container;
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
new file mode 100644
index 000000000..4c3cca5dd
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseOracleIT.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.jdbc;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.jupiter.api.Disabled;
+import org.testcontainers.containers.GenericContainer;
+
+import com.google.common.collect.Lists;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.given;
+
+@Disabled("Oracle mode of OceanBase Enterprise Edition does not provide docker
environment")
+public class JdbcOceanBaseOracleIT extends JdbcOceanBaseITBase {
+
+ @Override
+ String imageName() {
+ return null;
+ }
+
+ @Override
+ String host() {
+ return "e2e_oceanbase_oracle";
+ }
+
+ @Override
+ int port() {
+ return 2883;
+ }
+
+ @Override
+ String username() {
+ return "root";
+ }
+
+ @Override
+ String password() {
+ return "";
+ }
+
+ @Override
+ List<String> configFile() {
+ return
Lists.newArrayList("/jdbc_oceanbase_oracle_source_and_sink.conf");
+ }
+
+ @Override
+ GenericContainer<?> initContainer() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void startUp() {
+ jdbcCase = getJdbcCase();
+
+ given().ignoreExceptions()
+ .await()
+ .atMost(360, TimeUnit.SECONDS)
+ .untilAsserted(() ->
this.initializeJdbcConnection(jdbcCase.getJdbcUrl()));
+
+ createSchemaIfNeeded();
+ createNeededTables();
+ insertTestData();
+ }
+
+ @Override
+ public String quoteIdentifier(String field) {
+ return "\"" + field + "\"";
+ }
+
+ @Override
+ String createSqlTemplate() {
+ return "create table %s\n"
+ + "(\n"
+ + " VARCHAR_10_COL varchar2(10),\n"
+ + " CHAR_10_COL char(10),\n"
+ + " CLOB_COL clob,\n"
+ + " NUMBER_3_SF_2_DP number(3, 2),\n"
+ + " INTEGER_COL integer,\n"
+ + " FLOAT_COL float(10),\n"
+ + " REAL_COL real,\n"
+ + " BINARY_FLOAT_COL binary_float,\n"
+ + " BINARY_DOUBLE_COL binary_double,\n"
+ + " DATE_COL date,\n"
+ + " TIMESTAMP_WITH_3_FRAC_SEC_COL timestamp(3),\n"
+ + " TIMESTAMP_WITH_LOCAL_TZ timestamp with local time
zone\n"
+ + ")";
+ }
+
+ @Override
+ String[] getFieldNames() {
+ return new String[] {
+ "VARCHAR_10_COL",
+ "CHAR_10_COL",
+ "CLOB_COL",
+ "NUMBER_3_SF_2_DP",
+ "INTEGER_COL",
+ "FLOAT_COL",
+ "REAL_COL",
+ "BINARY_FLOAT_COL",
+ "BINARY_DOUBLE_COL",
+ "DATE_COL",
+ "TIMESTAMP_WITH_3_FRAC_SEC_COL",
+ "TIMESTAMP_WITH_LOCAL_TZ"
+ };
+ }
+
+ @Override
+ Pair<String[], List<SeaTunnelRow>> initTestData() {
+ String[] fieldNames = getFieldNames();
+
+ List<SeaTunnelRow> rows = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ SeaTunnelRow row =
+ new SeaTunnelRow(
+ new Object[] {
+ String.format("f%s", i),
+ String.format("f%s", i),
+ String.format("f%s", i),
+ BigDecimal.valueOf(1.1),
+ i,
+ Float.parseFloat("2.2"),
+ Float.parseFloat("2.2"),
+ Float.parseFloat("22.2"),
+ Double.parseDouble("2.2"),
+ Date.valueOf(LocalDate.now()),
+ Timestamp.valueOf(LocalDateTime.now()),
+ Timestamp.valueOf(LocalDateTime.now())
+ });
+ rows.add(row);
+ }
+
+ return Pair.of(fieldNames, rows);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf
new file mode 100644
index 000000000..098d3ffae
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_mysql_source_and_sink.conf
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Jdbc {
+ driver = com.oceanbase.jdbc.Driver
+ url =
"jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC"
+ user = root
+ password = ""
+ query = "SELECT c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64, c_boolean,
c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned, c_mediumint,
c_mediumint_unsigned, c_int, c_integer, c_bigint, c_bigint_unsigned, c_decimal,
c_decimal_unsigned, c_float, c_float_unsigned, c_double, c_double_unsigned,
c_char, c_tinytext, c_mediumtext, c_text, c_varchar, c_json, c_longtext,
c_date, c_datetime, c_timestamp, c_tinyblob, c_mediumblob, c_blob, c_longblob,
c_varbinary, c_binary, c_yea [...]
+ compatible_mode = "mysql"
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
+}
+
+sink {
+ Jdbc {
+ driver = com.oceanbase.jdbc.Driver
+ url =
"jdbc:oceanbase://e2e_oceanbase_mysql:2881/seatunnel?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true&serverTimezone=UTC"
+ user = root
+ password = ""
+ query = "insert into sink(c_bit_1, c_bit_8, c_bit_16, c_bit_32, c_bit_64,
c_boolean, c_tinyint, c_tinyint_unsigned, c_smallint, c_smallint_unsigned,
c_mediumint, c_mediumint_unsigned, c_int, c_integer, c_bigint,
c_bigint_unsigned, c_decimal, c_decimal_unsigned, c_float, c_float_unsigned,
c_double, c_double_unsigned, c_char, c_tinytext, c_mediumtext, c_text,
c_varchar, c_json, c_longtext, c_date, c_datetime, c_timestamp, c_tinyblob,
c_mediumblob, c_blob, c_longblob, c_varbinary, c_bin [...]
+ compatible_mode = "mysql"
+ }
+ # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf
new file mode 100644
index 000000000..bf2b1ccf0
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/resources/jdbc_oceanbase_oracle_source_and_sink.conf
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ jdbc{
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/seatunnel"
+ driver = com.oceanbase.jdbc.Driver
+ user = "root"
+ password = ""
+ query = "SELECT
VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ
FROM source"
+ compatible_mode = "oracle"
+ }
+}
+
+transform {
+}
+
+sink {
+ jdbc{
+ url = "jdbc:oceanbase://e2e_oceanbase_oracle:2883/seatunnel"
+ driver = com.oceanbase.jdbc.Driver
+ user = "root"
+ password = ""
+ query = "INSERT INTO sink
(VARCHAR_10_COL,CHAR_10_COL,CLOB_COL,NUMBER_3_SF_2_DP,INTEGER_COL,FLOAT_COL,REAL_COL,BINARY_FLOAT_COL,BINARY_DOUBLE_COL,DATE_COL,TIMESTAMP_WITH_3_FRAC_SEC_COL,TIMESTAMP_WITH_LOCAL_TZ)
VALUES(?,?,?,?,?,?,?,?,?,?,?,?)"
+ compatible_mode = "oracle"
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
index b1ea69efa..092f37f9b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/PulsarBatchIT.java
@@ -56,6 +56,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -113,6 +114,7 @@ public class PulsarBatchIT extends TestSuiteBase implements
TestResource {
new PulsarContainer(DockerImageName.parse(PULSAR_IMAGE_NAME))
.withNetwork(NETWORK)
.withNetworkAliases(PULSAR_HOST)
+ .withStartupTimeout(Duration.ofMinutes(3))
.withLogConsumer(
new Slf4jLogConsumer(
DockerLoggerFactory.getLogger(PULSAR_IMAGE_NAME)));