This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 34a57c70bc [Feature][Connectors-v2] Support timestamp startup mode for
Oracle and SQLServer CDC (#10428)
34a57c70bc is described below
commit 34a57c70bc06e13e0d2e07cdcda7f3d67ba2e559
Author: panda <[email protected]>
AuthorDate: Thu Mar 5 20:38:40 2026 +0800
[Feature][Connectors-v2] Support timestamp startup mode for Oracle and
SQLServer CDC (#10428)
---
docs/en/connectors/source/Oracle-CDC.md | 5 +-
docs/en/connectors/source/SqlServer-CDC.md | 8 +-
docs/zh/connectors/source/Oracle-CDC.md | 334 +++++++++++++++++++--
docs/zh/connectors/source/SqlServer-CDC.md | 6 +-
.../source/OracleIncrementalSourceOptions.java | 9 +-
.../oracle/source/offset/RedoLogOffsetFactory.java | 7 +-
.../cdc/oracle/utils/OracleConnectionUtils.java | 44 +++
.../source/SqlServerIncrementalSourceOptions.java | 1 +
.../sqlserver/source/offset/LsnOffsetFactory.java | 9 +-
.../cdc/sqlserver/utils/SqlServerUtils.java | 80 +++++
.../cdc/sqlserver/utils/SqlServerUtilsTest.java | 12 +
.../seatunnel/cdc/oracle/OracleCDCIT.java | 59 +++-
.../resources/oraclecdc_to_oracle_timestamp.conf | 57 ++++
.../connector/cdc/sqlserver/SqlServerCDCIT.java | 56 ++++
.../sqlservercdc_to_sqlserver_timestamp.conf | 57 ++++
15 files changed, 705 insertions(+), 39 deletions(-)
diff --git a/docs/en/connectors/source/Oracle-CDC.md
b/docs/en/connectors/source/Oracle-CDC.md
index 1bbc32a602..f5897736d0 100644
--- a/docs/en/connectors/source/Oracle-CDC.md
+++ b/docs/en/connectors/source/Oracle-CDC.md
@@ -231,7 +231,8 @@ exit;
| schema-names | List | No | - |
Schema name of the database to monitor.
[...]
| table-names | List | Yes | - |
Table name of the database to monitor. The table name needs to include the
database name, for example: `database_name.table_name`
[...]
| table-names-config | List | No | - |
Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys":
["key1"],"snapshotSplitColumn": "key2"}]
[...]
-| startup.mode | Enum | No | INITIAL |
Optional startup mode for Oracle CDC consumer, valid enumerations are
`initial`, `earliest`, `latest` and `specific`. <br/> `initial`: Synchronize
historical data at startup, and then synchronize incremental data.<br/>
`earliest`: Startup from the earliest offset possible.<br/> `latest`: Startup
from the latest offset.<br/> `specific`: Startup from user-supplied specific
offsets. [...]
+| startup.mode | Enum | No | INITIAL |
Optional startup mode for Oracle CDC consumer, valid enumerations are
`initial`, `earliest`, `latest`, `timestamp` and `specific`. <br/> `initial`:
Synchronize historical data at startup, and then synchronize incremental
data.<br/> `earliest`: Startup from the earliest offset possible.<br/>
`latest`: Startup from the latest offset.<br/> `specific`: Startup from
user-supplied specific offsets. [...]
+| startup.timestamp | Long | No | - |
Start from the specified timestamp (milliseconds since Unix epoch). This
timestamp is converted with `server-time-zone` when `startup.mode = timestamp`.
**Note, This option is required when the `startup.mode` option used
`timestamp`.**
[...]
| startup.specific-offset.file | String | No | - |
Start from the specified binlog file name. **Note, This option is required when
the `startup.mode` option used `specific`.**
[...]
| startup.specific-offset.pos | Long | No | - |
Start from the specified binlog file position. **Note, This option is required
when the `startup.mode` option used `specific`.**
[...]
| stop.mode | Enum | No | NEVER |
Optional stop mode for Oracle CDC consumer, valid enumerations are `never`,
`latest` or `specific`. <br/> `never`: Real-time job don't stop the
source.<br/> `latest`: Stop from the latest offset.<br/> `specific`: Stop from
user-supplied specific offset.
[...]
@@ -239,7 +240,7 @@ exit;
| stop.specific-offset.pos | Long | No | - |
Stop from the specified binlog file position. **Note, This option is required
when the `stop.mode` option used `specific`.**
[...]
| snapshot.split.size | Integer | No | 8096 |
The split size (number of rows) of table snapshot, captured tables are split
into multiple splits when read the snapshot of table.
[...]
| snapshot.fetch.size | Integer | No | 1024 |
The maximum fetch size for per poll when read table snapshot.
[...]
-| server-time-zone | String | No | UTC |
The session time zone in database server. If not set, then
ZoneId.systemDefault() is used to determine the server time zone.
[...]
+| server-time-zone | String | No | UTC |
The session time zone in database server. If not set, then
ZoneId.systemDefault() is used to determine the server time zone. This value is
also used when converting `startup.timestamp` to SCN. Set it explicitly when
database time zone and JVM time zone are different.
[...]
| connect.timeout.ms | Duration | No | 30000 |
The maximum time that the connector should wait after trying to connect to the
database server before timing out.
[...]
| connect.max-retries | Integer | No | 3 |
The max retry times that the connector should retry to build database server
connection.
[...]
| connection.pool.size | Integer | No | 20 |
The jdbc connection pool size.
[...]
diff --git a/docs/en/connectors/source/SqlServer-CDC.md
b/docs/en/connectors/source/SqlServer-CDC.md
index eb91c0a0ea..90da3bf970 100644
--- a/docs/en/connectors/source/SqlServer-CDC.md
+++ b/docs/en/connectors/source/SqlServer-CDC.md
@@ -81,8 +81,8 @@ case-sensitive databases, make sure the configured identifier
case matches the d
| table-names | List | Yes | - |
Table name is a combination of schema name and table name
(databaseName.schemaName.tableName).
[...]
| table-names-config | List | No | - |
Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys":
["key1"],"snapshotSplitColumn": "key2"}]
[...]
| url | String | Yes | - |
URL has to be with database, like
"jdbc:sqlserver://localhost:1433;databaseName=test".
[...]
-| startup.mode | Enum | No | INITIAL |
Optional startup mode for SqlServer CDC consumer, valid enumerations are
"initial", "earliest", "latest" and "specific".
[...]
-| startup.timestamp | Long | No | - |
Start from the specified epoch timestamp (in milliseconds).<br/> **Note, This
option is required when** the **"startup.mode" option used `'timestamp'`.**
[...]
+| startup.mode | Enum | No | INITIAL |
Optional startup mode for SqlServer CDC consumer, valid enumerations are
"initial", "earliest", "latest", "timestamp" and "specific".
[...]
+| startup.timestamp | Long | No | - |
Start from the specified epoch timestamp (in milliseconds). This timestamp is
converted with `server-time-zone` when `startup.mode = timestamp`.<br/> **Note,
This option is required when** the **"startup.mode" option used
`'timestamp'`.**
[...]
| startup.specific-offset.file | String | No | - |
Start from the specified binlog file name. <br/>**Note, This option is required
when the "startup.mode" option used `'specific'`.**
[...]
| startup.specific-offset.pos | Long | No | - |
Start from the specified binlog file position.<br/>**Note, This option is
required when the "startup.mode" option used `'specific'`.**
[...]
| stop.mode | Enum | No | NEVER |
Optional stop mode for SqlServer CDC consumer, valid enumerations are "never".
[...]
@@ -92,7 +92,7 @@ case-sensitive databases, make sure the configured identifier
case matches the d
| incremental.parallelism | Integer | No | 1 |
The number of parallel readers in the incremental phase.
[...]
| snapshot.split.size | Integer | No | 8096 |
The split size (number of rows) of table snapshot, captured tables are split
into multiple splits when read the snapshotof table.
[...]
| snapshot.fetch.size | Integer | No | 1024 |
The maximum fetch size for per poll when read table snapshot.
[...]
-| server-time-zone | String | No | UTC |
The session time zone in database server.
[...]
+| server-time-zone | String | No | UTC |
The session time zone in database server. This value is also used when
converting `startup.timestamp` to LSN. Set it explicitly when database time
zone and JVM time zone are different.
[...]
| connect.timeout | Duration | No | 30s |
The maximum time that the connector should wait after trying to connect to the
database server before timing out.
[...]
| connect.max-retries | Integer | No | 3 |
The max retry times that the connector should retry to build database server
connection.
[...]
| connection.pool.size | Integer | No | 20 |
The connection pool size.
[...]
@@ -103,7 +103,7 @@ case-sensitive databases, make sure the configured
identifier case matches the d
| exactly_once | Boolean | No | false |
Enable exactly once semantic.
[...]
| debezium.* | config | No | - |
Pass-through Debezium's properties to Debezium Embedded Engine which is used to
capture data changes from SqlServer server.<br/>See more about<br/>the
[Debezium's SqlServer Connector
properties](https://github.com/debezium/debezium/blob/1.6/documentation/modules/ROOT/pages/connectors/sqlserver.adoc#connector-properties)
[...]
| format | Enum | No | DEFAULT |
Optional output format for SqlServer CDC, valid enumerations are
"DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON".
[...]
-| common-options | | no | - |
Source plugin common parameters, please refer to [Source Common
Options](../common-options/source-common-options.md) for details.
[...]
+| common-options | | no | - |
Source plugin common parameters, please refer to [Source Common
Options](../common-options/source-common-options.md) for details.
[...]
### Enable Sql Server CDC
diff --git a/docs/zh/connectors/source/Oracle-CDC.md
b/docs/zh/connectors/source/Oracle-CDC.md
index 25e7651534..e6795c8d96 100644
--- a/docs/zh/connectors/source/Oracle-CDC.md
+++ b/docs/zh/connectors/source/Oracle-CDC.md
@@ -2,21 +2,21 @@ import ChangeLog from '../changelog/connector-cdc-oracle.md';
# Oracle CDC
-> Oracle CDC 源连接器
+> Oracle CDC 数据源连接器
-## 支持这些引擎
+## 支持的引擎
> SeaTunnel Zeta<br/>
> Flink <br/>
## 关键特性
-- [ ] [批](../../introduction/concepts/connector-v2-features.md)
-- [x] [流](../../introduction/concepts/connector-v2-features.md)
+- [ ] [批处理](../../introduction/concepts/connector-v2-features.md)
+- [x] [流处理](../../introduction/concepts/connector-v2-features.md)
- [x] [精确一次](../../introduction/concepts/connector-v2-features.md)
- [ ] [列投影](../../introduction/concepts/connector-v2-features.md)
-- [x] [并行性](../../introduction/concepts/connector-v2-features.md)
-- [x] [支持用户自定义split](../../introduction/concepts/connector-v2-features.md)
+- [x] [并行度](../../introduction/concepts/connector-v2-features.md)
+- [x] [支持用户自定义拆分](../../introduction/concepts/connector-v2-features.md)
## 描述
@@ -24,36 +24,36 @@ Oracle CDC 连接器允许从 Oracle 数据库读取快照数据和增量数据
## 注意
-Debezium Oracle 连接器不依赖于连续挖掘选项。连接器负责检测日志切换并自动调整要挖掘的日志,这是连续挖掘选项为您自动执行的操作。
-因此,您不能在 debezium 中设置名为 `log.mining.continuous.mine` 的此属性。
+Debezium Oracle 连接器不依赖于连续挖掘(continuous
mining)选项。该连接器负责检测日志切换并自动调整正在挖掘的日志,这正是连续挖掘选项自动为您完成的工作。
+因此,您不能在 debezium 中设置名为 `log.mining.continuous.mine` 的属性。
## 支持的数据源信息
-| 数据源 | 支持的版本 | 驱动程序 | URL | Maven |
-|--------|-----------|---------|-----|-------|
-| Oracle | 不同的依赖版本有不同的驱动程序类。 | oracle.jdbc.OracleDriver |
jdbc:oracle:thin:@datasource01:1523:xe |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
+| 数据源 | 支持的版本 | 驱动类 |
Url | Maven
|
+|------------|----------------------------------------------------------|--------------------------|----------------------------------------|--------------------------------------------------------------------|
+| Oracle | 不同的依赖版本有不同的驱动类。 | oracle.jdbc.OracleDriver |
jdbc:oracle:thin:@datasource01:1523:xe |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
## 数据库依赖
-### 安装 JDBC 驱动程序
+### 安装 Jdbc 驱动
-#### 对于 Spark/Flink 引擎
+#### 适用于 Spark/Flink 引擎
-> 1. 您需要确保 [JDBC 驱动程序 jar
包](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) 已放置在目录
`${SEATUNNEL_HOME}/plugins/` 中。
-> 2. 为了支持 i18n 字符集,将 `orai18n.jar` 复制到 `$SEATUNNEL_HOME/plugins/` 目录。
+> 1. 您需要确保 [jdbc 驱动 jar
包](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) 已放置在
`${SEATUNNEL_HOME}/plugins/` 目录下。
+> 2. 为了支持 i18n 字符集,请将 `orai18n.jar` 复制到 `$SEATUNNEL_HOME/plugins/` 目录。
-#### 对于 SeaTunnel Zeta 引擎
+#### 适用于 SeaTunnel Zeta 引擎
-> 1. 您需要确保 [JDBC 驱动程序 jar
包](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) 已放置在目录
`${SEATUNNEL_HOME}/lib/` 中。
-> 2. 为了支持 i18n 字符集,将 `orai18n.jar` 复制到 `$SEATUNNEL_HOME/lib/` 目录。
+> 1. 您需要确保 [jdbc 驱动 jar
包](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) 已放置在
`${SEATUNNEL_HOME}/lib/` 目录下。
+> 2. 为了支持 i18n 字符集,请将 `orai18n.jar` 复制到 `$SEATUNNEL_HOME/lib/` 目录。
### 启用 Oracle Logminer
-> 要在 Seatunnel 中启用 Oracle CDC(变更数据捕获)使用 Logminer(这是 Oracle 提供的内置工具),请按照以下步骤操作:
+> 要在 Seatunnel 中使用 Logminer(Oracle 提供的内置工具)启用 Oracle CDC(变更数据捕获),请按照以下步骤操作:
-#### 在没有 CDB(容器数据库)模式的情况下启用 Logminer。
+#### 在非 CDB(容器数据库)模式下启用 Logminer。
-1. 操作系统创建一个空文件目录来存储 Oracle 归档日志和用户表空间。
+1. 操作系统创建一个空的目录来存储 Oracle 归档日志和用户表空间。
```shell
mkdir -p /opt/oracle/oradata/recovery_area
@@ -76,9 +76,295 @@ ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
archive log list;
```
-3. 以管理员身份登录并创建一个名为 logminer_user 的帐户,密码为 "oracle",并授予其读取表和日志的权限。
+3. 以管理员身份登录并创建一个名为 logminer_user 的账户,密码为 "oracle",并授予其读取表和日志的权限。
-## 变更日志
+```sql
+CREATE TABLESPACE logminer_tbs DATAFILE
'/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON
MAXSIZE UNLIMITED;
+CREATE USER logminer_user IDENTIFIED BY oracle DEFAULT TABLESPACE logminer_tbs
QUOTA UNLIMITED ON logminer_tbs;
-<ChangeLog />
+GRANT CREATE SESSION TO logminer_user;
+GRANT SELECT ON V_$DATABASE to logminer_user;
+GRANT SELECT ON V_$LOG TO logminer_user;
+GRANT SELECT ON V_$LOGFILE TO logminer_user;
+GRANT SELECT ON V_$LOGMNR_LOGS TO logminer_user;
+GRANT SELECT ON V_$LOGMNR_CONTENTS TO logminer_user;
+GRANT SELECT ON V_$ARCHIVED_LOG TO logminer_user;
+GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO logminer_user;
+GRANT EXECUTE ON DBMS_LOGMNR TO logminer_user;
+GRANT EXECUTE ON DBMS_LOGMNR_D TO logminer_user;
+GRANT SELECT ANY TRANSACTION TO logminer_user;
+GRANT SELECT ON V_$TRANSACTION TO logminer_user;
+```
+
+##### 注意:Oracle 11g 不支持以下命令
+
+```sql
+GRANT LOGMINING TO logminer_user;
+```
+
+##### 仅授予需要采集的表的权限
+
+```sql
+GRANT SELECT ANY TABLE TO logminer_user;
+GRANT ANALYZE ANY TO logminer_user;
+```
+
+#### 在 Oracle CDB (容器数据库) + PDB (可插拔数据库) 模式下启用 Logminer
+
+1. 操作系统创建一个空的目录来存储 Oracle 归档日志和用户表空间。
+
+```shell
+mkdir -p /opt/oracle/oradata/recovery_area
+mkdir -p /opt/oracle/oradata/ORCLCDB
+mkdir -p /opt/oracle/oradata/ORCLCDB/ORCLPDB1
+chown -R oracle /opt/oracle/***
+```
+
+2. 以管理员身份登录并启用日志记录
+
+```sql
+sqlplus /nolog
+connect sys as sysdba; # 密码: oracle
+alter system set db_recovery_file_dest_size = 10G;
+alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area'
scope=spfile;
+shutdown immediate
+startup mount
+alter database archivelog;
+alter database open;
+archive log list;
+```
+
+3. 在 CDB 中执行
+
+```sql
+ALTER TABLE TEST.* ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
+ALTER TABLE TEST.T2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
+```
+
+4. 创建 debeziume 账户
+
+> 在 CDB 中操作
+
+```sql
+sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
+CREATE TABLESPACE logminer_tbs DATAFILE
'/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'
+ SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
+exit;
+```
+
+> 在 PDB 中操作
+
+```sql
+sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
+ CREATE TABLESPACE logminer_tbs DATAFILE
'/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf'
+ SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
+ exit;
+```
+
+5. 在 CDB 中操作
+
+```sql
+sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
+
+CREATE USER c##dbzuser IDENTIFIED BY dbz
+DEFAULT TABLESPACE logminer_tbs
+QUOTA UNLIMITED ON logminer_tbs
+CONTAINER=ALL;
+
+GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
+GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
+GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
+GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
+GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;
+
+GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
+GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
+GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;
+
+GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
+GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;
+
+GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;
+GRANT analyze any TO debeziume_1 CONTAINER=ALL;
+
+exit;
+```
+
+## 数据类型映射
+
+| Oracle 数据类型
| SeaTunnel 数据类型 |
+|--------------------------------------------------------------------------------------|---------------------|
+| INTEGER
| INT |
+| FLOAT
| DECIMAL(38, 18) |
+| NUMBER(precision <= 9, scale == 0)
| INT |
+| NUMBER(9 < precision <= 18, scale == 0)
| BIGINT |
+| NUMBER(18 < precision, scale == 0)
| DECIMAL(38, 0) |
+| NUMBER(precision == 0, scale == 0)
| DECIMAL(38, 18) |
+| NUMBER(scale != 0)
| DECIMAL(38, 18) |
+| BINARY_DOUBLE
| DOUBLE |
+| BINARY_FLOAT<br/>REAL
| FLOAT |
+|
CHAR<br/>NCHAR<br/>NVARCHAR2<br/>VARCHAR2<br/>LONG<br/>ROWID<br/>NCLOB<br/>CLOB<br/>
| STRING |
+| DATE
| DATE |
+| TIMESTAMP<br/>TIMESTAMP WITH LOCAL TIME ZONE
| TIMESTAMP |
+| BLOB<br/>RAW<br/>LONG RAW<br/>BFILE
| BYTES |
+
+## 源端选项
+
+| 参数名称 | 类型 | 是否必选 | 默认值 | 描述
[...]
+|-------------------------------------------|----------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
+| url | String | 是 | - |
JDBC 连接的 URL。例如:`jdbc:oracle:thin:datasource01:1523:xe`。
[...]
+| username | String | 是 | - |
连接数据库服务器时使用的数据库用户名。
[...]
+| password | String | 是 | - |
连接数据库服务器时使用的数据库密码。
[...]
+| database-names | List | 否 | - |
要监控的数据库名称。
[...]
+| schema-names | List | 否 | - |
要监控的数据库 Schema 名称。
[...]
+| table-names | List | 是 | - |
要监控的数据库表名。表名需要包含数据库名,例如:`database_name.table_name`
[...]
+| table-names-config | List | 否 | - |
表配置列表。例如:`[{"table": "db1.schema1.table1","primaryKeys":
["key1"],"snapshotSplitColumn": "key2"}]`
[...]
+| startup.mode | Enum | 否 | INITIAL |
Oracle CDC 使用者的可选启动模式,有效枚举值为 `initial`、`earliest`、`latest`、`timestamp` 和
`specific`。<br/> `initial`:启动时同步历史数据,然后同步增量数据。<br/>
`earliest`:从尽可能早的偏移量启动。<br/> `latest`:从最新的偏移量启动。<br/> `specific`:从用户提供的特定偏移量启动。
|
+| startup.timestamp | Long | 否 | - |
从指定的时间戳(自 Unix 纪元以来的毫秒数)启动。当 `startup.mode = timestamp` 时,该时间戳会按
`server-time-zone` 转换。**注意,当 `startup.mode` 选项使用 `timestamp` 时,此选项是必需的。**
[...]
+| startup.specific-offset.file | String | 否 | - |
从指定的 binlog 文件名启动。**注意,当 `startup.mode` 选项使用 `specific` 时,此选项是必需的。**
[...]
+| startup.specific-offset.pos | Long | 否 | - |
从指定的 binlog 文件位置启动。**注意,当 `startup.mode` 选项使用 `specific` 时,此选项是必需的。**
[...]
+| stop.mode | Enum | 否 | NEVER |
Oracle CDC 使用者的可选停止模式,有效枚举值为 `never`、`latest` 或 `specific`。<br/>
`never`:实时任务不停止源。<br/> `latest`:从最新的偏移量停止。<br/> `specific`:从用户提供的特定偏移量停止。
[...]
+| stop.specific-offset.file | String | 否 | - |
从指定的 binlog 文件名停止。**注意,当 `stop.mode` 选项使用 `specific` 时,此选项是必需的。**
[...]
+| stop.specific-offset.pos | Long | 否 | - |
从指定的 binlog 文件位置停止。**注意,当 `stop.mode` 选项使用 `specific` 时,此选项是必需的。**
[...]
+| snapshot.split.size | Integer | 否 | 8096 |
表快照的拆分大小(行数),在读取表快照时,捕获的表将被拆分为多个拆分块。
[...]
+| snapshot.fetch.size | Integer | 否 | 1024 |
读取表快照时每次轮询的最大获取大小。
[...]
+| server-time-zone | String | 否 | UTC |
数据库服务器中的会话时区。如果未设置,则使用 ZoneId.systemDefault() 来确定服务器时区。该参数也用于将
`startup.timestamp` 转换为 SCN。若数据库时区与 JVM 时区不同,建议显式配置。
[...]
+| connect.timeout.ms | Duration | 否 | 30000 |
连接器在尝试连接数据库服务器后超时的最大等待时间。
[...]
+| connect.max-retries | Integer | 否 | 3 |
连接器尝试建立数据库服务器连接的最大重试次数。
[...]
+| connection.pool.size | Integer | 否 | 20 |
JDBC 连接池大小。
[...]
+| chunk-key.even-distribution.factor.upper-bound | Double | 否 | 100
| 分块键分布因子的上限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子小于或等于此上限(即 (MAX(id) - MIN(id) + 1) /
行数),则表分块将针对均匀分布进行优化。否则,如果分布因子较大,则表将被视为分布不均,如果估计的分片数超过
`sample-sharding.threshold` 指定的值,则将使用基于采样的分片策略。默认值为 100.0。 |
+| chunk-key.even-distribution.factor.lower-bound | Double | 否 | 0.05
| 分块键分布因子的下限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子大于或等于此下限(即 (MAX(id) - MIN(id) + 1) /
行数),则表分块将针对均匀分布进行优化。否则,如果分布因子较小,则表将被视为分布不均,如果估计的分片数超过
`sample-sharding.threshold` 指定的值,则将使用基于采样的分片策略。默认值为 0.05。 |
+| sample-sharding.threshold | Integer | 否 | 1000 |
此配置指定触发采样分片策略的预估分片数阈值。当分布因子超出 `chunk-key.even-distribution.factor.upper-bound`
和 `chunk-key.even-distribution.factor.lower-bound` 指定的范围,并且预估的分片数(计算为近似行数 /
分块大小)超过此阈值时,将使用采样分片策略。这有助于更有效地处理大型数据集。默认值为 1000 个分片。
|
+| inverse-sampling.rate | Integer | 否 | 1000 |
采样分片策略中使用的采样率的倒数。例如,如果此值设置为 1000,则意味着在采样过程中应用 1/1000
的采样率。此选项提供了控制采样粒度的灵活性,从而影响最终的分片数量。在处理首选较低采样率的极大型数据集时,它特别有用。默认值为 1000。
|
+| exactly_once | Boolean | 否 | false |
启用精确一次语义。
[...]
+| use_select_count | Boolean | 否 | false |
使用 `select count` 统计表行数,而不是在全量阶段使用其他方法。在这种情况下,当通过分析表使用 SQL 更新统计信息更快时,直接使用
`select count`。
[...]
+| skip_analyze | Boolean | 否 | false |
在全量阶段跳过表行数的分析。在这种情况下,您需要定期调度分析表 SQL 以更新相关表统计信息,或者您的表数据更改不频繁。
[...]
+| format | Enum | 否 | DEFAULT |
Oracle CDC 的可选输出格式,有效枚举值为 `DEFAULT`、`COMPATIBLE_DEBEZIUM_JSON`。
[...]
+| schema-changes.enabled | Boolean | 否 | false |
Schema 演进默认禁用。目前我们仅支持 `add column`、`drop column`、`rename column` 和 `modify
column`。
[...]
+| debezium | Config | 否 | - |
透传 [Debezium
属性](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/oracle.adoc#connector-properties)
给 Debezium Embedded Engine,该引擎用于捕获 Oracle 服务器的数据更改。
[...]
+| common-options | | 否 | - |
源端插件常用参数,详情请参阅 [源端常用选项](../common-options/source-common-options.md)。
[...]
+| decimal_type_narrowing | Boolean | 否 | true
| 数值类型收缩,如果为 true,则在不损失精度的情况下,将 decimal 类型收缩为 int 或 long 类型。目前仅支持
Oracle。请参阅下文的 `decimal_type_narrowing`。
[...]
+
+
+### decimal_type_narrowing
+
+数值类型收缩,如果为 true,则在不损失精度的情况下,将 decimal 类型收缩为 int 或 long 类型。目前仅支持 Oracle。
+
+例如:
+
+decimal_type_narrowing = true
+| Oracle | SeaTunnel |
+|---------------|-----------|
+| NUMBER(1, 0) | Boolean |
+| NUMBER(6, 0) | INT |
+| NUMBER(10, 0) | BIGINT |
+
+decimal_type_narrowing = false
+
+| Oracle | SeaTunnel |
+|---------------|----------------|
+| NUMBER(1, 0) | Decimal(1, 0) |
+| NUMBER(6, 0) | Decimal(6, 0) |
+| NUMBER(10, 0) | Decimal(10, 0) |
+
+## 任务示例
+
+### 简单示例
+
+> 支持多表读取
+
+```conf
+source {
+ # 这是一个示例源端插件,**仅用于测试和演示源端插件功能**
+ Oracle-CDC {
+ plugin_output = "customers"
+ username = "system"
+ password = "oracle"
+ database-names = ["XE"]
+ schema-names = ["DEBEZIUM"]
+ table-names = ["XE.DEBEZIUM.FULL_TYPES", "XE.DEBEZIUM.FULL_TYPES2"]
+ url = "jdbc:oracle:thin:@oracle-host:1521:xe"
+ source.reader.close.timeout = 120000
+ }
+}
+```
+
+> 在全量阶段使用 select count(*) 代替 analysis table 来统计表行数
+```conf
+source {
+# 这是一个示例源端插件,**仅用于测试和演示源端插件功能**
+ Oracle-CDC {
+ plugin_output = "customers"
+ use_select_count = true
+ username = "system"
+ password = "oracle"
+ database-names = ["XE"]
+ schema-names = ["DEBEZIUM"]
+ table-names = ["XE.DEBEZIUM.FULL_TYPES"]
+ url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+ source.reader.close.timeout = 120000
+ }
+}
+```
+
+> 使用 select NUM_ROWS from all_tables 获取表行数,但跳过 analyze table 操作。
+
+```conf
+source {
+# 这是一个示例源端插件,**仅用于测试和演示源端插件功能**
+ Oracle-CDC {
+ plugin_output = "customers"
+ skip_analyze = true
+ username = "system"
+ password = "oracle"
+ database-names = ["XE"]
+ schema-names = ["DEBEZIUM"]
+ table-names = ["XE.DEBEZIUM.FULL_TYPES"]
+ url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+ source.reader.close.timeout = 120000
+ }
+}
+```
+
+### 支持表的自定义主键
+
+```conf
+source {
+ Oracle-CDC {
+ plugin_output = "customers"
+ url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+ source.reader.close.timeout = 120000
+ username = "system"
+ password = "oracle"
+ database-names = ["XE"]
+ schema-names = ["DEBEZIUM"]
+ table-names = ["XE.DEBEZIUM.FULL_TYPES"]
+ table-names-config = [
+ {
+ table = "XE.DEBEZIUM.FULL_TYPES"
+ primaryKeys = ["ID"]
+ }
+ ]
+ }
+}
+```
+
+### 支持以兼容 debezium 的格式发送到 kafka
+
+> 必须与 kafka 连接器 sink 配合使用,详情请参阅 [兼容 debezium
格式](../formats/cdc-compatible-debezium-json.md)
+
+## 更新日志
+
+<ChangeLog />
diff --git a/docs/zh/connectors/source/SqlServer-CDC.md
b/docs/zh/connectors/source/SqlServer-CDC.md
index 064c770008..5192bfaaab 100644
--- a/docs/zh/connectors/source/SqlServer-CDC.md
+++ b/docs/zh/connectors/source/SqlServer-CDC.md
@@ -79,8 +79,8 @@ Sql Server CDC 连接器允许从 SqlServer 数据库读取快照数据和增量
| table-names | List | 是 | -
| 表名是模式名和表名的组合 (databaseName.schemaName.tableName)。
|
| table-names-config | List | 否 | -
| 表配置列表。例如:[{"table": "db1.schema1.table1","primaryKeys":
["key1"],"snapshotSplitColumn": "key2"}]
|
| url | String | 是 | -
| URL 必须包含数据库,如 "jdbc:sqlserver://localhost:1433;databaseName=test"。
|
-| startup.mode | Enum | 否 |
INITIAL | SqlServer CDC 消费者的可选启动模式,有效枚举为 "initial"、"earliest"、"latest" 和
"specific"。
|
-| startup.timestamp | Long | 否 | -
| 从指定的纪元时间戳(以毫秒为单位)开始。<br/> **注意,当 "startup.mode" 选项使用 `'timestamp'`
时,此选项是必需的。**
|
+| startup.mode | Enum | 否 |
INITIAL | SqlServer CDC 消费者的可选启动模式,有效枚举为
"initial"、"earliest"、"latest"、"timestamp" 和 "specific"。
|
+| startup.timestamp | Long | 否 | -
| 从指定的纪元时间戳(以毫秒为单位)开始。当 `startup.mode = timestamp` 时,该时间戳会按 `server-time-zone`
转换。<br/> **注意,当 "startup.mode" 选项使用 `'timestamp'` 时,此选项是必需的。**
|
| startup.specific-offset.file | String | 否 | -
| 从指定的 binlog 文件名开始。<br/>**注意,当 "startup.mode" 选项使用 `'specific'` 时,此选项是必需的。**
|
| startup.specific-offset.pos | Long | 否 | -
| 从指定的 binlog 文件位置开始。<br/>**注意,当 "startup.mode" 选项使用 `'specific'` 时,此选项是必需的。**
|
| stop.mode | Enum | 否 | NEVER
| SqlServer CDC 消费者的可选停止模式,有效枚举为 "never"。
|
@@ -90,7 +90,7 @@ Sql Server CDC 连接器允许从 SqlServer 数据库读取快照数据和增量
| incremental.parallelism | Integer | 否 | 1
| 增量阶段中并行读取器的数量。
|
| snapshot.split.size | Integer | 否 | 8096
| 表快照的分割大小(行数),读取表快照时,捕获的表会被分割为多个分割。
|
| snapshot.fetch.size | Integer | 否 | 1024
| 读取表快照时每次轮询的最大获取大小。
|
-| server-time-zone | String | 否 | UTC
| 数据库服务器中的会话时区。
|
+| server-time-zone | String | 否 | UTC
| 数据库服务器中的会话时区。该参数也用于将 `startup.timestamp` 转换为 LSN。若数据库时区与 JVM 时区不同,建议显式配置。
|
| connect.timeout | Duration | 否 | 30s
| 连接器尝试连接到数据库服务器后在超时之前应该等待的最长时间。
|
| connect.max-retries | Integer | 否 | 3
| 连接器应该重试建立数据库服务器连接的最大重试次数。
|
| connection.pool.size | Integer | 否 | 20
| 连接池大小。
|
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceOptions.java
index a00fe6e83f..ec18a4aaee 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceOptions.java
@@ -34,11 +34,14 @@ public class OracleIncrementalSourceOptions extends
JdbcSourceOptions {
Options.key(SourceOptions.STARTUP_MODE_KEY)
.singleChoice(
StartupMode.class,
- Arrays.asList(StartupMode.INITIAL,
StartupMode.LATEST))
+ Arrays.asList(
+ StartupMode.INITIAL,
+ StartupMode.LATEST,
+ StartupMode.TIMESTAMP))
.defaultValue(StartupMode.INITIAL)
.withDescription(
"Optional startup mode for CDC source,
valid enumerations are "
- + "\"initial\", \"earliest\",
\"latest\", \"timestamp\"\n or \"specific\"");
+ + "\"initial\", \"latest\" or
\"timestamp\"");
public static final SingleChoiceOption<StopMode> STOP_MODE =
(SingleChoiceOption)
@@ -47,7 +50,7 @@ public class OracleIncrementalSourceOptions extends
JdbcSourceOptions {
.defaultValue(StopMode.NEVER)
.withDescription(
"Optional stop mode for CDC source, valid
enumerations are "
- + "\"never\", \"latest\",
\"timestamp\"\n or \"specific\"");
+ + "\"never\"");
public static final Option<List<String>> SCHEMA_NAMES =
Options.key("schema-names")
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/offset/RedoLogOffsetFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/offset/RedoLogOffsetFactory.java
index 2d85a94cac..e0b3a97eae 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/offset/RedoLogOffsetFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/offset/RedoLogOffsetFactory.java
@@ -74,6 +74,11 @@ public class RedoLogOffsetFactory extends OffsetFactory {
@Override
public Offset timestamp(long timestamp) {
- throw new UnsupportedOperationException("not supported create new
Offset by timestamp.");
+ try (JdbcConnection jdbcConnection =
dialect.openJdbcConnection(sourceConfig)) {
+ return OracleConnectionUtils.timestampToScn(
+ jdbcConnection, timestamp,
sourceConfig.getServerTimeZone());
+ } catch (Exception e) {
+ throw new RuntimeException("Convert timestamp to redoLog offset
error", e);
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleConnectionUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleConnectionUtils.java
index 125dfed314..04db02151d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleConnectionUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleConnectionUtils.java
@@ -33,9 +33,11 @@ import io.debezium.relational.TableId;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Calendar;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.TimeZone;
import static io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX;
@@ -84,6 +86,48 @@ public class OracleConnectionUtils {
}
}
+ /**
+ * Convert timestamp (milliseconds since epoch) to Oracle SCN.
+ *
+ * @param jdbc JDBC connection
+ * @param timestampMs timestamp in milliseconds since epoch
+ * @param serverTimeZone database server time zone
+ * @return RedoLogOffset with the corresponding SCN
+ */
+ public static RedoLogOffset timestampToScn(
+ JdbcConnection jdbc, long timestampMs, String serverTimeZone) {
+ try {
+ String effectiveServerTimeZone =
+ serverTimeZone == null ? TimeZone.getDefault().getID() :
serverTimeZone;
+ LOG.info(
+ "Converting timestamp {} to SCN with server time zone {}",
+ timestampMs,
+ effectiveServerTimeZone);
+ String sql = "SELECT TIMESTAMP_TO_SCN(?) AS SCN FROM DUAL";
+ return jdbc.prepareQueryAndMap(
+ sql,
+ statement -> {
+ java.sql.Timestamp timestamp = new
java.sql.Timestamp(timestampMs);
+ Calendar calendar =
+
Calendar.getInstance(TimeZone.getTimeZone(effectiveServerTimeZone));
+ statement.setTimestamp(1, timestamp, calendar);
+ },
+ rs -> {
+ if (rs.next()) {
+ final String scn = rs.getString(1);
+ LOG.info("Converted timestamp {} to SCN: {}",
timestampMs, scn);
+ return new
RedoLogOffset(Scn.valueOf(scn).longValue());
+ } else {
+ throw new SeaTunnelException(
+ "Cannot convert timestamp to SCN. Make
sure the specified timestamp is valid.");
+ }
+ });
+ } catch (SQLException e) {
+ LOG.error("Failed to convert timestamp to SCN", e);
+ throw new SeaTunnelException("Failed to convert timestamp to SCN",
e);
+ }
+ }
+
public static List<TableId> listTables(
JdbcConnection jdbcConnection, String database,
RelationalTableFilters tableFilters)
throws SQLException {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceOptions.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceOptions.java
index 49d1ffb728..888ba18b33 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceOptions.java
@@ -34,6 +34,7 @@ public class SqlServerIncrementalSourceOptions extends
JdbcSourceOptions {
Arrays.asList(
StartupMode.INITIAL,
StartupMode.EARLIEST,
+ StartupMode.TIMESTAMP,
StartupMode.LATEST))
.defaultValue(StartupMode.INITIAL)
.withDescription(
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/offset/LsnOffsetFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/offset/LsnOffsetFactory.java
index 201b7a1c80..f24075ebb8 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/offset/LsnOffsetFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/offset/LsnOffsetFactory.java
@@ -73,6 +73,13 @@ public class LsnOffsetFactory extends OffsetFactory {
@Override
public Offset timestamp(long timestamp) {
- throw new UnsupportedOperationException("not supported create new
Offset by timestamp.");
+ try (JdbcConnection jdbcConnection =
dialect.openJdbcConnection(sourceConfig)) {
+ return SqlServerUtils.timestampToLsn(
+ (SqlServerConnection) jdbcConnection,
+ timestamp,
+ sourceConfig.getServerTimeZone());
+ } catch (Exception e) {
+ throw new RuntimeException("Convert timestamp to LSN offset
error", e);
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtils.java
index 2ad9bac901..5867a84c87 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtils.java
@@ -46,13 +46,16 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.TimeZone;
/** The utils for SqlServer data source. */
@Slf4j
@@ -282,6 +285,83 @@ public class SqlServerUtils {
}
}
+ /**
+ * Convert timestamp (in milliseconds) to LSN using SQL Server's
sys.fn_cdc_map_time_to_lsn
+ * function.
+ *
+ * @param connection SQL Server connection
+ * @param timestampMs timestamp in milliseconds
+ * @param serverTimeZone database server time zone
+ * @return LsnOffset corresponding to the timestamp
+ */
+ public static LsnOffset timestampToLsn(
+ SqlServerConnection connection, long timestampMs, String
serverTimeZone) {
+ try {
+ String effectiveServerTimeZone =
+ serverTimeZone == null ? TimeZone.getDefault().getID() :
serverTimeZone;
+ String sql =
+ "SELECT sys.fn_cdc_map_time_to_lsn('smallest greater than
or equal', ?) AS lsn";
+
+ return connection.prepareQueryAndMap(
+ sql,
+ ps -> {
+ Timestamp timestamp = new Timestamp(timestampMs);
+ Calendar calendar =
+
Calendar.getInstance(TimeZone.getTimeZone(effectiveServerTimeZone));
+ ps.setTimestamp(1, timestamp, calendar);
+ },
+ rs -> {
+ if (!rs.next()) {
+ throw new SQLException(
+ String.format(
+ "No LSN found for timestamp %d
(%s)",
+ timestampMs, new
Timestamp(timestampMs)));
+ }
+ byte[] lsnBytes = rs.getBytes("lsn");
+ if (lsnBytes == null) {
+ throw new SQLException(
+ String.format(
+ "LSN is null for timestamp %d
(%s). "
+ + "This may indicate that
CDC is not enabled or the timestamp is too old.",
+ timestampMs, new
java.sql.Timestamp(timestampMs)));
+ }
+ Lsn lsn = Lsn.valueOf(lsnBytes);
+ log.info(
+ "Converted timestamp {} ({}) to LSN: {}",
+ timestampMs,
+ new Timestamp(timestampMs),
+ lsn);
+ return LsnOffset.valueOf(lsn.toString());
+ });
+ } catch (SQLException e) {
+ throw new SeaTunnelException(
+ String.format(
+ "Failed to convert timestamp %d (%s) to LSN: %s",
+ timestampMs, new Timestamp(timestampMs),
e.getMessage()),
+ e);
+ }
+ }
+
+ /**
+ * Convert LSN string to LsnOffset.
+ *
+ * @param lsnString LSN string in format "00000027:00000a80:0003"
+ * @return LsnOffset
+ */
+ public static LsnOffset lsnStringToOffset(String lsnString) {
+ try {
+ // Validate LSN format
+ Lsn.valueOf(lsnString);
+ return LsnOffset.valueOf(lsnString);
+ } catch (Exception e) {
+ throw new SeaTunnelException(
+ String.format(
+ "Invalid LSN format: %s. Expected format:
00000027:00000a80:0003",
+ lsnString),
+ e);
+ }
+ }
+
/** Get split scan query for the given table. */
public static String buildSplitScanQuery(
TableId tableId, SeaTunnelRowType rowType, boolean isFirstSplit,
boolean isLastSplit) {
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtilsTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtilsTest.java
index c10897bfd3..fa0c606c5e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtilsTest.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtilsTest.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset.LsnOffset;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -70,4 +71,15 @@ public class SqlServerUtilsTest {
Assertions.assertEquals(
"SELECT * FROM [db1].[schema1].[table1] WHERE [id] >= ?",
splitScanSQL);
}
+
+ @Test
+ public void testLsnStringToOffset() {
+ String lsnString = "00000027:00000a80:0003";
+ LsnOffset offset = SqlServerUtils.lsnStringToOffset(lsnString);
+ Assertions.assertEquals(lsnString, offset.getCommitLsn().toString());
+
+ String invalidLsn = "invalid_lsn";
+ Assertions.assertThrows(
+ RuntimeException.class, () ->
SqlServerUtils.lsnStringToOffset(invalidLsn));
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
index 0f53bc7dc5..adcacd89fa 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
@@ -41,6 +41,7 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -641,13 +642,69 @@ public class OracleCDCIT extends AbstractOracleCDCIT
implements TestResource {
return String.format(SOURCE_SQL_TEMPLATE, database, tableName);
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
+ public void testTimestampStartupMode(TestContainer container) throws
Exception {
+ clearTable(SCEHMA_NAME, SINK_TABLE1);
+ clearTable(SCEHMA_NAME, SOURCE_TABLE1);
+
+ insertRow(1, SCEHMA_NAME, SOURCE_TABLE1);
+
+ // sleep for a while to make sure the timestamp is different
+ TimeUnit.SECONDS.sleep(5);
+ long startTimestamp = System.currentTimeMillis();
+ TimeUnit.SECONDS.sleep(5);
+
+ insertRow(2, SCEHMA_NAME, SOURCE_TABLE1);
+
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ container.executeJob(
+ "/oraclecdc_to_oracle_timestamp.conf",
+ Arrays.asList("timestamp=" + startTimestamp));
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ return null;
+ });
+
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ List<List<Object>> sinkRows =
+ querySql(
+ "SELECT ID FROM "
+ + SCEHMA_NAME
+ + "."
+ + SINK_TABLE1
+ + " ORDER BY ID ASC");
+ Assertions.assertTrue(
+ sinkRows.stream()
+ .anyMatch(row ->
row.get(0).toString().equals("2")));
+ Assertions.assertFalse(
+ sinkRows.stream()
+ .anyMatch(row ->
row.get(0).toString().equals("1")));
+ });
+ }
+
private void insertSourceTable(String database, String tableName) {
+ insertRow(1, database, tableName);
+ }
+
+ private void insertRow(int id, String database, String tableName) {
executeSql(
"INSERT INTO "
+ database
+ "."
+ tableName
- + " VALUES (1, 'vc2', 'vc2', 'nvc2', 'c', 'nc',1.1,
2.22, 3.33, 8.888, 4.4444, 5.555, 6.66, 1234.567891, 1234.567891, 77.323,1, 22,
333, 4444, 5555, 1, 99, 1001, 999999999, 999999999999999999,94, 9949,
999999994, 999999999999999949,
99999999999999999999999999999999999949,TO_DATE('2022-10-30',
'yyyy-mm-dd'),TO_TIMESTAMP('2022-10-30 12:34:56.00789', 'yyyy-mm-dd
HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.12545', 'yyyy-mm-dd
HH24:MI:SS.FF5'),TO_TIMESTAMP('2022 [...]
+ + " VALUES ("
+ + id
+ + ", 'vc2', 'vc2', 'nvc2', 'c', 'nc',1.1, 2.22, 3.33,
8.888, 4.4444, 5.555, 6.66, 1234.567891, 1234.567891, 77.323,1, 22, 333, 4444,
5555, 1, 99, 1001, 999999999, 999999999999999999,94, 9949, 999999994,
999999999999999949,
99999999999999999999999999999999999949,TO_DATE('2022-10-30',
'yyyy-mm-dd'),TO_TIMESTAMP('2022-10-30 12:34:56.00789', 'yyyy-mm-dd
HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.12545', 'yyyy-mm-dd
HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12: [...]
}
private void updateSourceTable(String database, String tableName) {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_timestamp.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_timestamp.conf
new file mode 100644
index 0000000000..a4daae48fc
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_timestamp.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ Oracle-CDC {
+ plugin_output = "customers"
+ username = "system"
+ password = "top_secret"
+ database-names = ["ORCLCDB"]
+ schema-names = ["DEBEZIUM"]
+ table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES"]
+ url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
+ source.reader.close.timeout = 120000
+ connection.pool.size = 1
+ startup.mode = "timestamp"
+ startup.timestamp = ${timestamp}
+ debezium {
+ database.oracle.jdbc.timezoneAsRegion = "false"
+ }
+ }
+}
+
+sink {
+ Jdbc {
+ plugin_input = "customers"
+ driver = "oracle.jdbc.driver.OracleDriver"
+ url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
+ username = "system"
+ password = "top_secret"
+ generate_sink_sql = true
+ database = "ORCLCDB"
+ table = "DEBEZIUM.SINK_FULL_TYPES"
+ batch_size = 1
+ primary_keys = ["ID"]
+ connection.pool.size = 1
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index db3e750186..b70f60843b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -718,4 +718,60 @@ public class SqlServerCDCIT extends TestSuiteBase
implements TestResource {
querySql(selectSql, sinkTable));
});
}
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK},
+ disabledReason = "Currently SPARK do not support cdc")
+ public void testTimestampStartupMode(TestContainer container) throws
InterruptedException {
+ initializeSqlServerTable(DATABASE_NAME);
+ executeSql("TRUNCATE TABLE " + DATABASE_NAME + "." + SCHEMA_NAME +
".full_types_sink;");
+
+ // Use full fields insert to avoid implicit conversion error for
varbinary columns with null
+ // value
+ executeSql(
+ "INSERT INTO "
+ + SOURCE_TABLE_CUSTOM_PRIMARY_KEY
+ + " VALUES (1, 'cč1', 'vcč', 'tč', N'cč', N'vcč',
N'tč', 1.123, 2, 3.323, 4.323, 5.323, 6.323, 1, 22, 333, 4444, 55555,
'2018-07-13', '10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78',
'2018-07-13 14:23:45', '<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS
varbinary(100)), 5.32)");
+
+ // sleep for a while to make sure the timestamp is different
+ TimeUnit.SECONDS.sleep(5);
+ long startTimestamp = System.currentTimeMillis();
+ TimeUnit.SECONDS.sleep(5);
+
+ executeSql(
+ "INSERT INTO "
+ + SOURCE_TABLE_CUSTOM_PRIMARY_KEY
+ + " VALUES (2, 'cč2', 'vcč', 'tč', N'cč', N'vcč',
N'tč', 1.123, 2, 3.323, 4.323, 5.323, 6.323, 1, 22, 333, 4444, 55555,
'2018-07-13', '10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78',
'2018-07-13 14:23:45', '<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS
varbinary(100)), 5.32)");
+
+ CompletableFuture.runAsync(
+ () -> {
+ try {
+ container.executeJob(
+ "/sqlservercdc_to_sqlserver_timestamp.conf",
+ Arrays.asList("timestamp=" + startTimestamp));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ List<List<Object>> sinkRows =
+ querySql(
+ "SELECT id FROM "
+ + DATABASE_NAME
+ + "."
+ + SCHEMA_NAME
+ + ".full_types_sink ORDER
BY id ASC");
+ Assertions.assertTrue(
+ sinkRows.stream()
+ .anyMatch(row ->
row.get(0).toString().equals("2")));
+ Assertions.assertFalse(
+ sinkRows.stream()
+ .anyMatch(row ->
row.get(0).toString().equals("1")));
+ });
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_timestamp.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_timestamp.conf
new file mode 100644
index 0000000000..f0e178ebfb
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_timestamp.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
+
+source {
+ SqlServer-CDC {
+ plugin_output = "customers"
+ username = "sa"
+ password = "Password!"
+ database-names = ["column_type_test"]
+ table-names = ["column_type_test.dbo.full_types_custom_primary_key"]
+ url = "jdbc:sqlserver://sqlserver-host:1433;databaseName=column_type_test"
+ startup.mode = "timestamp"
+ startup.timestamp = ${timestamp}
+ exactly_once = true
+ table-names-config = [
+ {
+ table = "column_type_test.dbo.full_types_custom_primary_key"
+ primaryKeys = ["id"]
+ }
+ ]
+ }
+}
+
+sink {
+ Jdbc {
+ plugin_input = "customers"
+ driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+ url = "jdbc:sqlserver://sqlserver-host:1433;encrypt=false"
+ user = "sa"
+ password = "Password!"
+ generate_sink_sql = true
+ database = "column_type_test"
+ table = "dbo.full_types_sink"
+ batch_size = 1
+ primary_keys = ["id"]
+ }
+}