This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 34a57c70bc [Feature][Connectors-v2] Support timestamp startup mode for 
Oracle and SQLServer CDC (#10428)
34a57c70bc is described below

commit 34a57c70bc06e13e0d2e07cdcda7f3d67ba2e559
Author: panda <[email protected]>
AuthorDate: Thu Mar 5 20:38:40 2026 +0800

    [Feature][Connectors-v2] Support timestamp startup mode for Oracle and 
SQLServer CDC (#10428)
---
 docs/en/connectors/source/Oracle-CDC.md            |   5 +-
 docs/en/connectors/source/SqlServer-CDC.md         |   8 +-
 docs/zh/connectors/source/Oracle-CDC.md            | 334 +++++++++++++++++++--
 docs/zh/connectors/source/SqlServer-CDC.md         |   6 +-
 .../source/OracleIncrementalSourceOptions.java     |   9 +-
 .../oracle/source/offset/RedoLogOffsetFactory.java |   7 +-
 .../cdc/oracle/utils/OracleConnectionUtils.java    |  44 +++
 .../source/SqlServerIncrementalSourceOptions.java  |   1 +
 .../sqlserver/source/offset/LsnOffsetFactory.java  |   9 +-
 .../cdc/sqlserver/utils/SqlServerUtils.java        |  80 +++++
 .../cdc/sqlserver/utils/SqlServerUtilsTest.java    |  12 +
 .../seatunnel/cdc/oracle/OracleCDCIT.java          |  59 +++-
 .../resources/oraclecdc_to_oracle_timestamp.conf   |  57 ++++
 .../connector/cdc/sqlserver/SqlServerCDCIT.java    |  56 ++++
 .../sqlservercdc_to_sqlserver_timestamp.conf       |  57 ++++
 15 files changed, 705 insertions(+), 39 deletions(-)

diff --git a/docs/en/connectors/source/Oracle-CDC.md 
b/docs/en/connectors/source/Oracle-CDC.md
index 1bbc32a602..f5897736d0 100644
--- a/docs/en/connectors/source/Oracle-CDC.md
+++ b/docs/en/connectors/source/Oracle-CDC.md
@@ -231,7 +231,8 @@ exit;
 | schema-names                              | List     | No       | -       | 
Schema name of the database to monitor.                                         
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
 | table-names                               | List     | Yes      | -       | 
Table name of the database to monitor. The table name needs to include the 
database name, for example: `database_name.table_name`                          
                                                                                
                                                                                
                                                                                
                    [...]
 | table-names-config                        | List     | No       | -       | 
Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys": 
["key1"],"snapshotSplitColumn": "key2"}]                                        
                                                                                
                                                                                
                                                                                
                [...]
-| startup.mode                              | Enum     | No       | INITIAL | 
Optional startup mode for Oracle CDC consumer, valid enumerations are 
`initial`, `earliest`, `latest` and `specific`. <br/> `initial`: Synchronize 
historical data at startup, and then synchronize incremental data.<br/> 
`earliest`: Startup from the earliest offset possible.<br/> `latest`: Startup 
from the latest offset.<br/> `specific`: Startup from user-supplied specific 
offsets.                                 [...]
+| startup.mode                              | Enum     | No       | INITIAL | 
Optional startup mode for Oracle CDC consumer, valid enumerations are 
`initial`, `earliest`, `latest`, `timestamp` and `specific`. <br/> `initial`: 
Synchronize historical data at startup, and then synchronize incremental 
data.<br/> `earliest`: Startup from the earliest offset possible.<br/> 
`latest`: Startup from the latest offset.<br/> `specific`: Startup from 
user-supplied specific offsets.                    [...]
+| startup.timestamp                         | Long     | No       | -       | 
Start from the specified timestamp (milliseconds since Unix epoch). This 
timestamp is converted with `server-time-zone` when `startup.mode = timestamp`. 
**Note, This option is required when the `startup.mode` option used 
`timestamp`.**                                                                  
                                                                                
                                  [...]
 | startup.specific-offset.file              | String   | No       | -       | 
Start from the specified binlog file name. **Note, This option is required when 
the `startup.mode` option used `specific`.**                                    
                                                                                
                                                                                
                                                                                
               [...]
 | startup.specific-offset.pos               | Long     | No       | -       | 
Start from the specified binlog file position. **Note, This option is required 
when the `startup.mode` option used `specific`.**                               
                                                                                
                                                                                
                                                                                
                [...]
 | stop.mode                                 | Enum     | No       | NEVER   | 
Optional stop mode for Oracle CDC consumer, valid enumerations are `never`, 
`latest` or `specific`. <br/> `never`: Real-time job don't stop the 
source.<br/> `latest`: Stop from the latest offset.<br/> `specific`: Stop from 
user-supplied specific offset.                                                  
                                                                                
                                [...]
@@ -239,7 +240,7 @@ exit;
 | stop.specific-offset.pos                  | Long     | No       | -       | 
Stop from the specified binlog file position. **Note, This option is required 
when the `stop.mode` option used `specific`.**                                  
                                                                                
                                                                                
                                                                                
                 [...]
 | snapshot.split.size                       | Integer  | No       | 8096    | 
The split size (number of rows) of table snapshot, captured tables are split 
into multiple splits when read the snapshot of table.                           
                                                                                
                                                                                
                                                                                
                  [...]
 | snapshot.fetch.size                       | Integer  | No       | 1024    | 
The maximum fetch size for per poll when read table snapshot.                   
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
-| server-time-zone                          | String   | No       | UTC     | 
The session time zone in database server. If not set, then 
ZoneId.systemDefault() is used to determine the server time zone.               
                                                                                
                                                                                
                                                                                
                                    [...]
+| server-time-zone                          | String   | No       | UTC     | 
The session time zone in database server. If not set, then 
ZoneId.systemDefault() is used to determine the server time zone. This value is 
also used when converting `startup.timestamp` to SCN. Set it explicitly when 
database time zone and JVM time zone are different.                             
                                                                                
                                       [...]
 | connect.timeout.ms                        | Duration | No       | 30000   | 
The maximum time that the connector should wait after trying to connect to the 
database server before timing out.                                              
                                                                                
                                                                                
                                                                                
                [...]
 | connect.max-retries                       | Integer  | No       | 3       | 
The max retry times that the connector should retry to build database server 
connection.                                                                     
                                                                                
                                                                                
                                                                                
                  [...]
 | connection.pool.size                      | Integer  | No       | 20      | 
The jdbc connection pool size.                                                  
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
diff --git a/docs/en/connectors/source/SqlServer-CDC.md 
b/docs/en/connectors/source/SqlServer-CDC.md
index eb91c0a0ea..90da3bf970 100644
--- a/docs/en/connectors/source/SqlServer-CDC.md
+++ b/docs/en/connectors/source/SqlServer-CDC.md
@@ -81,8 +81,8 @@ case-sensitive databases, make sure the configured identifier 
case matches the d
 | table-names                               | List     | Yes      | -       | 
Table name is a combination of schema name and table name 
(databaseName.schemaName.tableName).                                            
                                                                                
                                                                                
                                                                                
                                     [...]
 | table-names-config                        | List     | No       | -       | 
Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys": 
["key1"],"snapshotSplitColumn": "key2"}]                                        
                                                                                
                                                                                
                                                                                
                [...]
 | url                                       | String   | Yes      | -       | 
URL has to be with database, like 
"jdbc:sqlserver://localhost:1433;databaseName=test".                            
                                                                                
                                                                                
                                                                                
                                                             [...]
-| startup.mode                              | Enum     | No       | INITIAL | 
Optional startup mode for SqlServer CDC consumer, valid enumerations are 
"initial", "earliest", "latest" and "specific".                                 
                                                                                
                                                                                
                                                                                
                      [...]
-| startup.timestamp                         | Long     | No       | -       | 
Start from the specified epoch timestamp (in milliseconds).<br/> **Note, This 
option is required when** the **"startup.mode" option used `'timestamp'`.**     
                                                                                
                                                                                
                                                                                
                 [...]
+| startup.mode                              | Enum     | No       | INITIAL | 
Optional startup mode for SqlServer CDC consumer, valid enumerations are 
"initial", "earliest", "latest", "timestamp" and "specific".                    
                                                                                
                                                                                
                                                                                
                      [...]
+| startup.timestamp                         | Long     | No       | -       | 
Start from the specified epoch timestamp (in milliseconds). This timestamp is 
converted with `server-time-zone` when `startup.mode = timestamp`.<br/> **Note, 
This option is required when** the **"startup.mode" option used 
`'timestamp'`.**                                                                
                                                                                
                                 [...]
 | startup.specific-offset.file              | String   | No       | -       | 
Start from the specified binlog file name. <br/>**Note, This option is required 
when the "startup.mode" option used `'specific'`.**                             
                                                                                
                                                                                
                                                                                
               [...]
 | startup.specific-offset.pos               | Long     | No       | -       | 
Start from the specified binlog file position.<br/>**Note, This option is 
required when the "startup.mode" option used `'specific'`.**                    
                                                                                
                                                                                
                                                                                
                     [...]
 | stop.mode                                 | Enum     | No       | NEVER   | 
Optional stop mode for SqlServer CDC consumer, valid enumerations are "never".  
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
@@ -92,7 +92,7 @@ case-sensitive databases, make sure the configured identifier 
case matches the d
 | incremental.parallelism                   | Integer  | No       | 1       | 
The number of parallel readers in the incremental phase.                        
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
 | snapshot.split.size                       | Integer  | No       | 8096    | 
The split size (number of rows) of table snapshot, captured tables are split 
into multiple splits when read the snapshotof table.                            
                                                                                
                                                                                
                                                                                
                  [...]
 | snapshot.fetch.size                       | Integer  | No       | 1024    | 
The maximum fetch size for per poll when read table snapshot.                   
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
-| server-time-zone                          | String   | No       | UTC     | 
The session time zone in database server.                                       
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
+| server-time-zone                          | String   | No       | UTC     | 
The session time zone in database server. This value is also used when 
converting `startup.timestamp` to LSN. Set it explicitly when database time 
zone and JVM time zone are different.                                           
                                                                                
                                                                                
                            [...]
 | connect.timeout                           | Duration | No       | 30s     | 
The maximum time that the connector should wait after trying to connect to the 
database server before timing out.                                              
                                                                                
                                                                                
                                                                                
                [...]
 | connect.max-retries                       | Integer  | No       | 3       | 
The max retry times that the connector should retry to build database server 
connection.                                                                     
                                                                                
                                                                                
                                                                                
                  [...]
 | connection.pool.size                      | Integer  | No       | 20      | 
The connection pool size.                                                       
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
@@ -103,7 +103,7 @@ case-sensitive databases, make sure the configured 
identifier case matches the d
 | exactly_once                              | Boolean  | No       | false   | 
Enable exactly once semantic.                                                   
                                                                                
                                                                                
                                                                                
                                                                                
               [...]
 | debezium.*                                | config   | No       | -       | 
Pass-through Debezium's properties to Debezium Embedded Engine which is used to 
capture data changes from SqlServer server.<br/>See more about<br/>the 
[Debezium's SqlServer Connector 
properties](https://github.com/debezium/debezium/blob/1.6/documentation/modules/ROOT/pages/connectors/sqlserver.adoc#connector-properties)
                                                                                
              [...]
 | format                                    | Enum     | No       | DEFAULT | 
Optional output format for SqlServer CDC, valid enumerations are 
"DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON".                                           
                                                                                
                                                                                
                                                                                
                              [...]
-| common-options                            |          | no       | -       | 
Source plugin common parameters, please refer to [Source Common 
Options](../common-options/source-common-options.md) for details.               
                                                                                
                                                                                
                                                                                
                               [...]
+| common-options                            |          | no       | -       | 
Source plugin common parameters, please refer to [Source Common 
Options](../common-options/source-common-options.md) for details.               
                                                                                
                                                                                
                                                                                
                               [...]
 
 ### Enable Sql Server CDC
 
diff --git a/docs/zh/connectors/source/Oracle-CDC.md 
b/docs/zh/connectors/source/Oracle-CDC.md
index 25e7651534..e6795c8d96 100644
--- a/docs/zh/connectors/source/Oracle-CDC.md
+++ b/docs/zh/connectors/source/Oracle-CDC.md
@@ -2,21 +2,21 @@ import ChangeLog from '../changelog/connector-cdc-oracle.md';
 
 # Oracle CDC
 
-> Oracle CDC 源连接器
+> Oracle CDC 数据源连接器
 
-## 支持这些引擎
+## 支持的引擎
 
 > SeaTunnel Zeta<br/>
 > Flink <br/>
 
 ## 关键特性
 
-- [ ] [批](../../introduction/concepts/connector-v2-features.md)
-- [x] [流](../../introduction/concepts/connector-v2-features.md)
+- [ ] [批处理](../../introduction/concepts/connector-v2-features.md)
+- [x] [流处理](../../introduction/concepts/connector-v2-features.md)
 - [x] [精确一次](../../introduction/concepts/connector-v2-features.md)
 - [ ] [列投影](../../introduction/concepts/connector-v2-features.md)
-- [x] [并行性](../../introduction/concepts/connector-v2-features.md)
-- [x] [支持用户自定义split](../../introduction/concepts/connector-v2-features.md)
+- [x] [并行度](../../introduction/concepts/connector-v2-features.md)
+- [x] [支持用户自定义拆分](../../introduction/concepts/connector-v2-features.md)
 
 ## 描述
 
@@ -24,36 +24,36 @@ Oracle CDC 连接器允许从 Oracle 数据库读取快照数据和增量数据
 
 ## 注意
 
-Debezium Oracle 连接器不依赖于连续挖掘选项。连接器负责检测日志切换并自动调整要挖掘的日志,这是连续挖掘选项为您自动执行的操作。
-因此,您不能在 debezium 中设置名为 `log.mining.continuous.mine` 的此属性。
+Debezium Oracle 连接器不依赖于连续挖掘(continuous 
mining)选项。该连接器负责检测日志切换并自动调整正在挖掘的日志,这正是连续挖掘选项自动为您完成的工作。
+因此,您不能在 debezium 中设置名为 `log.mining.continuous.mine` 的属性。
 
 ## 支持的数据源信息
 
-| 数据源 | 支持的版本 | 驱动程序 | URL | Maven |
-|--------|-----------|---------|-----|-------|
-| Oracle | 不同的依赖版本有不同的驱动程序类。 | oracle.jdbc.OracleDriver | 
jdbc:oracle:thin:@datasource01:1523:xe | 
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
+| 数据源 |                    支持的版本                    |          驱动类          |  
                Url                   |                               Maven     
                           |
+|------------|----------------------------------------------------------|--------------------------|----------------------------------------|--------------------------------------------------------------------|
+| Oracle     | 不同的依赖版本有不同的驱动类。 | oracle.jdbc.OracleDriver | 
jdbc:oracle:thin:@datasource01:1523:xe | 
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
 
 ## 数据库依赖
 
-### 安装 JDBC 驱动程序
+### 安装 Jdbc 驱动
 
-#### 对于 Spark/Flink 引擎
+#### 适用于 Spark/Flink 引擎
 
-> 1. 您需要确保 [JDBC 驱动程序 jar 
包](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) 已放置在目录 
`${SEATUNNEL_HOME}/plugins/` 中。
-> 2. 为了支持 i18n 字符集,将 `orai18n.jar` 复制到 `$SEATUNNEL_HOME/plugins/` 目录。
+> 1. 您需要确保 [jdbc 驱动 jar 
包](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) 已放置在 
`${SEATUNNEL_HOME}/plugins/` 目录下。
+> 2. 为了支持 i18n 字符集,请将 `orai18n.jar` 复制到 `$SEATUNNEL_HOME/plugins/` 目录。
 
-#### 对于 SeaTunnel Zeta 引擎
+#### 适用于 SeaTunnel Zeta 引擎
 
-> 1. 您需要确保 [JDBC 驱动程序 jar 
包](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) 已放置在目录 
`${SEATUNNEL_HOME}/lib/` 中。
-> 2. 为了支持 i18n 字符集,将 `orai18n.jar` 复制到 `$SEATUNNEL_HOME/lib/` 目录。
+> 1. 您需要确保 [jdbc 驱动 jar 
包](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) 已放置在 
`${SEATUNNEL_HOME}/lib/` 目录下。
+> 2. 为了支持 i18n 字符集,请将 `orai18n.jar` 复制到 `$SEATUNNEL_HOME/lib/` 目录。
 
 ### 启用 Oracle Logminer
 
-> 要在 Seatunnel 中启用 Oracle CDC(变更数据捕获)使用 Logminer(这是 Oracle 提供的内置工具),请按照以下步骤操作:
+> 要在 Seatunnel 中使用 Logminer(Oracle 提供的内置工具)启用 Oracle CDC(变更数据捕获),请按照以下步骤操作:
 
-#### 在没有 CDB(容器数据库)模式的情况下启用 Logminer。
+#### 在非 CDB(容器数据库)模式下启用 Logminer。
 
-1. 操作系统创建一个空文件目录来存储 Oracle 归档日志和用户表空间。
+1. 操作系统创建一个空的目录来存储 Oracle 归档日志和用户表空间。
 
 ```shell
 mkdir -p /opt/oracle/oradata/recovery_area
@@ -76,9 +76,295 @@ ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
 archive log list;
 ```
 
-3. 以管理员身份登录并创建一个名为 logminer_user 的帐户,密码为 "oracle",并授予其读取表和日志的权限。
+3. 以管理员身份登录并创建一个名为 logminer_user 的账户,密码为 "oracle",并授予其读取表和日志的权限。
 
-## 变更日志
+```sql
+CREATE TABLESPACE logminer_tbs DATAFILE 
'/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON 
MAXSIZE UNLIMITED;
+CREATE USER logminer_user IDENTIFIED BY oracle DEFAULT TABLESPACE logminer_tbs 
QUOTA UNLIMITED ON logminer_tbs;
 
-<ChangeLog />
+GRANT CREATE SESSION TO logminer_user;
+GRANT SELECT ON V_$DATABASE to logminer_user;
+GRANT SELECT ON V_$LOG TO logminer_user;
+GRANT SELECT ON V_$LOGFILE TO logminer_user;
+GRANT SELECT ON V_$LOGMNR_LOGS TO logminer_user;
+GRANT SELECT ON V_$LOGMNR_CONTENTS TO logminer_user;
+GRANT SELECT ON V_$ARCHIVED_LOG TO logminer_user;
+GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO logminer_user;
+GRANT EXECUTE ON DBMS_LOGMNR TO logminer_user;
+GRANT EXECUTE ON DBMS_LOGMNR_D TO logminer_user;
+GRANT SELECT ANY TRANSACTION TO logminer_user;
+GRANT SELECT ON V_$TRANSACTION TO logminer_user;
+```
+
+##### 注意:Oracle 11g 不支持以下命令
+
+```sql
+GRANT LOGMINING TO logminer_user;
+```
+
+##### 仅授予需要采集的表的权限
+
+```sql
+GRANT SELECT ANY TABLE TO logminer_user;
+GRANT ANALYZE ANY TO logminer_user;
+```
+
+#### 在 Oracle CDB (容器数据库) + PDB (可插拔数据库) 模式下启用 Logminer
+
+1. 操作系统创建一个空的目录来存储 Oracle 归档日志和用户表空间。
+
+```shell
+mkdir -p /opt/oracle/oradata/recovery_area
+mkdir -p /opt/oracle/oradata/ORCLCDB
+mkdir -p /opt/oracle/oradata/ORCLCDB/ORCLPDB1
+chown -R oracle /opt/oracle/***
+```
+
+2. 以管理员身份登录并启用日志记录
+
+```sql
+sqlplus /nolog
+connect sys as sysdba; # 密码: oracle
+alter system set db_recovery_file_dest_size = 10G;
+alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' 
scope=spfile;
+shutdown immediate
+startup mount
+alter database archivelog;
+alter database open;
+archive log list;
+```
+
+3. 在 CDB 中执行
+
+```sql
+ALTER TABLE TEST.* ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
+ALTER TABLE TEST.T2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
+```
+
+4. 创建 debeziume 账户
+
+> 在 CDB 中操作
+
+```sql
+sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
+CREATE TABLESPACE logminer_tbs DATAFILE 
'/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf'
+ SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
+exit;
+```
+
+> 在 PDB 中操作
+
+```sql
+sqlplus sys/top_secret@//localhost:1521/ORCLPDB1 as sysdba
+ CREATE TABLESPACE logminer_tbs DATAFILE 
'/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf'
+   SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
+ exit;
+```
+
+5. 在 CDB 中操作
+
+```sql
+sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba
+
+CREATE USER c##dbzuser IDENTIFIED BY dbz
+DEFAULT TABLESPACE logminer_tbs
+QUOTA UNLIMITED ON logminer_tbs
+CONTAINER=ALL;
+
+GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
+GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
+GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
+GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
+GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;
+
+GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
+GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
+GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;
+
+GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
+GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;
+
+GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
+GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;
+GRANT analyze any TO debeziume_1 CONTAINER=ALL;
+
+exit;
+```
+
+## 数据类型映射
+
+|                                   Oracle 数据类型                                
   | SeaTunnel 数据类型 |
+|--------------------------------------------------------------------------------------|---------------------|
+| INTEGER                                                                      
        | INT                 |
+| FLOAT                                                                        
        | DECIMAL(38, 18)     |
+| NUMBER(precision <= 9, scale == 0)                                           
        | INT                 |
+| NUMBER(9 < precision <= 18, scale == 0)                                      
        | BIGINT              |
+| NUMBER(18 < precision, scale == 0)                                           
        | DECIMAL(38, 0)      |
+| NUMBER(precision == 0, scale == 0)                                           
        | DECIMAL(38, 18)     |
+| NUMBER(scale != 0)                                                           
        | DECIMAL(38, 18)     |
+| BINARY_DOUBLE                                                                
        | DOUBLE              |
+| BINARY_FLOAT<br/>REAL                                                        
        | FLOAT               |
+| 
CHAR<br/>NCHAR<br/>NVARCHAR2<br/>VARCHAR2<br/>LONG<br/>ROWID<br/>NCLOB<br/>CLOB<br/>
 | STRING              |
+| DATE                                                                         
        | DATE                |
+| TIMESTAMP<br/>TIMESTAMP WITH LOCAL TIME ZONE                                 
        | TIMESTAMP           |
+| BLOB<br/>RAW<br/>LONG RAW<br/>BFILE                                          
        | BYTES               |
+
+## 源端选项
+
+|                      参数名称                 |   类型   | 是否必选 | 默认值 | 描述         
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|-------------------------------------------|----------|----------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| url                                       | String   | 是      | -       | 
JDBC 连接的 URL。例如:`jdbc:oracle:thin:datasource01:1523:xe`。                        
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
+| username                                  | String   | 是      | -       | 
连接数据库服务器时使用的数据库用户名。                                                             
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
+| password                                  | String   | 是      | -       | 
连接数据库服务器时使用的数据库密码。                                                              
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
+| database-names                            | List     | 否       | -       | 
要监控的数据库名称。                                                                      
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| schema-names                              | List     | 否       | -       | 
要监控的数据库 Schema 名称。                                                              
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| table-names                               | List     | 是      | -       | 
要监控的数据库表名。表名需要包含数据库名,例如:`database_name.table_name`                              
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
+| table-names-config                        | List     | 否       | -       | 
表配置列表。例如:`[{"table": "db1.schema1.table1","primaryKeys": 
["key1"],"snapshotSplitColumn": "key2"}]`                                       
                                                                                
                                                                                
                                                                                
                                       [...]
+| startup.mode                              | Enum     | 否       | INITIAL | 
Oracle CDC 使用者的可选启动模式,有效枚举值为 `initial`、`earliest`、`latest`、`timestamp` 和 
`specific`。<br/> `initial`:启动时同步历史数据,然后同步增量数据。<br/> 
`earliest`:从尽可能早的偏移量启动。<br/> `latest`:从最新的偏移量启动。<br/> `specific`:从用户提供的特定偏移量启动。 
                                                                                
                                                                                
                                         |
+| startup.timestamp                         | Long     | 否       | -       | 
从指定的时间戳(自 Unix 纪元以来的毫秒数)启动。当 `startup.mode = timestamp` 时,该时间戳会按 
`server-time-zone` 转换。**注意,当 `startup.mode` 选项使用 `timestamp` 时,此选项是必需的。**       
                                                                                
                                                                                
                                                                                
                               [...]
+| startup.specific-offset.file              | String   | 否       | -       | 
从指定的 binlog 文件名启动。**注意,当 `startup.mode` 选项使用 `specific` 时,此选项是必需的。**            
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| startup.specific-offset.pos               | Long     | 否       | -       | 
从指定的 binlog 文件位置启动。**注意,当 `startup.mode` 选项使用 `specific` 时,此选项是必需的。**           
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| stop.mode                                 | Enum     | 否       | NEVER   | 
Oracle CDC 使用者的可选停止模式,有效枚举值为 `never`、`latest` 或 `specific`。<br/> 
`never`:实时任务不停止源。<br/> `latest`:从最新的偏移量停止。<br/> `specific`:从用户提供的特定偏移量停止。       
                                                                                
                                                                                
                                                                                
                               [...]
+| stop.specific-offset.file                 | String   | 否       | -       | 
从指定的 binlog 文件名停止。**注意,当 `stop.mode` 选项使用 `specific` 时,此选项是必需的。**               
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| stop.specific-offset.pos                  | Long     | 否       | -       | 
从指定的 binlog 文件位置停止。**注意,当 `stop.mode` 选项使用 `specific` 时,此选项是必需的。**              
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| snapshot.split.size                       | Integer  | 否       | 8096    | 
表快照的拆分大小(行数),在读取表快照时,捕获的表将被拆分为多个拆分块。                                            
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| snapshot.fetch.size                       | Integer  | 否       | 1024    | 
读取表快照时每次轮询的最大获取大小。                                                              
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| server-time-zone                          | String   | 否       | UTC     | 
数据库服务器中的会话时区。如果未设置,则使用 ZoneId.systemDefault() 来确定服务器时区。该参数也用于将 
`startup.timestamp` 转换为 SCN。若数据库时区与 JVM 时区不同,建议显式配置。                            
                                                                                
                                                                                
                                                                                
                                 [...]
+| connect.timeout.ms                        | Duration | 否       | 30000   | 
连接器在尝试连接数据库服务器后超时的最大等待时间。                                                       
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| connect.max-retries                       | Integer  | 否       | 3       | 
连接器尝试建立数据库服务器连接的最大重试次数。                                                         
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| connection.pool.size                      | Integer  | 否       | 20      | 
JDBC 连接池大小。                                                                     
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| chunk-key.even-distribution.factor.upper-bound | Double   | 否       | 100    
 | 分块键分布因子的上限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子小于或等于此上限(即 (MAX(id) - MIN(id) + 1) / 
行数),则表分块将针对均匀分布进行优化。否则,如果分布因子较大,则表将被视为分布不均,如果估计的分片数超过 
`sample-sharding.threshold` 指定的值,则将使用基于采样的分片策略。默认值为 100.0。 |
+| chunk-key.even-distribution.factor.lower-bound | Double   | 否       | 0.05   
 | 分块键分布因子的下限。此因子用于确定表数据是否均匀分布。如果计算出的分布因子大于或等于此下限(即 (MAX(id) - MIN(id) + 1) / 
行数),则表分块将针对均匀分布进行优化。否则,如果分布因子较小,则表将被视为分布不均,如果估计的分片数超过 
`sample-sharding.threshold` 指定的值,则将使用基于采样的分片策略。默认值为 0.05。  |
+| sample-sharding.threshold                 | Integer  | 否       | 1000    | 
此配置指定触发采样分片策略的预估分片数阈值。当分布因子超出 `chunk-key.even-distribution.factor.upper-bound` 
和 `chunk-key.even-distribution.factor.lower-bound` 指定的范围,并且预估的分片数(计算为近似行数 / 
分块大小)超过此阈值时,将使用采样分片策略。这有助于更有效地处理大型数据集。默认值为 1000 个分片。                            
                                                       |
+| inverse-sampling.rate                     | Integer  | 否       | 1000    | 
采样分片策略中使用的采样率的倒数。例如,如果此值设置为 1000,则意味着在采样过程中应用 1/1000 
的采样率。此选项提供了控制采样粒度的灵活性,从而影响最终的分片数量。在处理首选较低采样率的极大型数据集时,它特别有用。默认值为 1000。           
                                                                                
                                                                   |
+| exactly_once                              | Boolean  | 否       | false   | 
启用精确一次语义。                                                                       
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| use_select_count                          | Boolean  | 否       | false   | 
使用 `select count` 统计表行数,而不是在全量阶段使用其他方法。在这种情况下,当通过分析表使用 SQL 更新统计信息更快时,直接使用 
`select count`。                                                                 
                                                                                
                                                                                
                                                                                
                      [...]
+| skip_analyze                              | Boolean  | 否       | false   | 
在全量阶段跳过表行数的分析。在这种情况下,您需要定期调度分析表 SQL 以更新相关表统计信息,或者您的表数据更改不频繁。                    
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| format                                    | Enum     | 否       | DEFAULT | 
Oracle CDC 的可选输出格式,有效枚举值为 `DEFAULT`、`COMPATIBLE_DEBEZIUM_JSON`。                 
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| schema-changes.enabled                    | Boolean  | 否       | false   | 
Schema 演进默认禁用。目前我们仅支持 `add column`、`drop column`、`rename column` 和 `modify 
column`。                                                                        
                                                                                
                                                                                
                                                                                
                     [...]
+| debezium                                  | Config   | 否       | -       | 
透传 [Debezium 
属性](https://github.com/debezium/debezium/blob/v1.9.8.Final/documentation/modules/ROOT/pages/connectors/oracle.adoc#connector-properties)
 给 Debezium Embedded Engine,该引擎用于捕获 Oracle 服务器的数据更改。                            
                                                                                
                                                                                
                           [...]
+| common-options                            |          | 否       | -       | 
源端插件常用参数,详情请参阅 [源端常用选项](../common-options/source-common-options.md)。            
                                                                                
                                                                                
                                                                                
                                                                                
                [...]
+| decimal_type_narrowing                    | Boolean | 否       | true         
   | 数值类型收缩,如果为 true,则在不损失精度的情况下,将 decimal 类型收缩为 int 或 long 类型。目前仅支持 
Oracle。请参阅下文的 `decimal_type_narrowing`。                                         
                                                                                
                                                                                
                                                                                
                         [...]
+
+
+### decimal_type_narrowing
+
+数值类型收缩,如果为 true,则在不损失精度的情况下,将 decimal 类型收缩为 int 或 long 类型。目前仅支持 Oracle。
+
+例如:
+
+decimal_type_narrowing = true
 
+| Oracle        | SeaTunnel |
+|---------------|-----------|
+| NUMBER(1, 0)  | Boolean   |
+| NUMBER(6, 0)  | INT       |
+| NUMBER(10, 0) | BIGINT    |
+
+decimal_type_narrowing = false
+
+| Oracle        | SeaTunnel      |
+|---------------|----------------|
+| NUMBER(1, 0)  | Decimal(1, 0)  |
+| NUMBER(6, 0)  | Decimal(6, 0)  |
+| NUMBER(10, 0) | Decimal(10, 0) |
+
+## 任务示例
+
+### 简单示例
+
+> 支持多表读取
+
+```conf
+source {
+  # 这是一个示例源端插件,**仅用于测试和演示源端插件功能**
+  Oracle-CDC {
+    plugin_output = "customers"
+    username = "system"
+    password = "oracle"
+    database-names = ["XE"]
+    schema-names = ["DEBEZIUM"]
+    table-names = ["XE.DEBEZIUM.FULL_TYPES", "XE.DEBEZIUM.FULL_TYPES2"]
+    url = "jdbc:oracle:thin:@oracle-host:1521:xe"
+    source.reader.close.timeout = 120000
+  }
+}
+```
+
+> 在全量阶段使用 select count(*) 代替 analysis table 来统计表行数
+```conf
+source {
+# 这是一个示例源端插件,**仅用于测试和演示源端插件功能**
+  Oracle-CDC {
+    plugin_output = "customers"
+    use_select_count = true 
+    username = "system"
+    password = "oracle"
+    database-names = ["XE"]
+    schema-names = ["DEBEZIUM"]
+    table-names = ["XE.DEBEZIUM.FULL_TYPES"]
+    url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+    source.reader.close.timeout = 120000
+  }
+}
+```
+
+> 使用 select NUM_ROWS from all_tables 获取表行数,但跳过 analyze table 操作。
+
+```conf
+source {
+# 这是一个示例源端插件,**仅用于测试和演示源端插件功能**
+  Oracle-CDC {
+    plugin_output = "customers"
+    skip_analyze = true 
+    username = "system"
+    password = "oracle"
+    database-names = ["XE"]
+    schema-names = ["DEBEZIUM"]
+    table-names = ["XE.DEBEZIUM.FULL_TYPES"]
+    url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+    source.reader.close.timeout = 120000
+  }
+}
+```
+
+### 支持表的自定义主键
+
+```conf
+source {
+  Oracle-CDC {
+    plugin_output = "customers"
+    url = "jdbc:oracle:thin:system/oracle@oracle-host:1521:xe"
+    source.reader.close.timeout = 120000
+    username = "system"
+    password = "oracle"
+    database-names = ["XE"]
+    schema-names = ["DEBEZIUM"]
+    table-names = ["XE.DEBEZIUM.FULL_TYPES"]
+    table-names-config = [
+      {
+        table = "XE.DEBEZIUM.FULL_TYPES"
+        primaryKeys = ["ID"]
+      }
+    ]
+  }
+}
+```
+
+### 支持以兼容 debezium 的格式发送到 kafka
+
+> 必须与 kafka 连接器 sink 配合使用,详情请参阅 [兼容 debezium 
格式](../formats/cdc-compatible-debezium-json.md)
+
+## 更新日志
+
+<ChangeLog />
diff --git a/docs/zh/connectors/source/SqlServer-CDC.md 
b/docs/zh/connectors/source/SqlServer-CDC.md
index 064c770008..5192bfaaab 100644
--- a/docs/zh/connectors/source/SqlServer-CDC.md
+++ b/docs/zh/connectors/source/SqlServer-CDC.md
@@ -79,8 +79,8 @@ Sql Server CDC 连接器允许从 SqlServer 数据库读取快照数据和增量
 | table-names                                    | List     | 是       | -      
 | 表名是模式名和表名的组合 (databaseName.schemaName.tableName)。                            
                                                                                
                                                                                
                                                          |
 | table-names-config                             | List     | 否       | -      
 | 表配置列表。例如:[{"table": "db1.schema1.table1","primaryKeys": 
["key1"],"snapshotSplitColumn": "key2"}]                                        
                                                                                
                                                                                
   |
 | url                                            | String   | 是       | -      
 | URL 必须包含数据库,如 "jdbc:sqlserver://localhost:1433;databaseName=test"。           
                                                                                
                                                                                
                                                             |
-| startup.mode                                   | Enum     | 否       | 
INITIAL | SqlServer CDC 消费者的可选启动模式,有效枚举为 "initial"、"earliest"、"latest" 和 
"specific"。                                                                     
                                                                                
                                                                 |
-| startup.timestamp                              | Long     | 否       | -      
 | 从指定的纪元时间戳(以毫秒为单位)开始。<br/> **注意,当 "startup.mode" 选项使用 `'timestamp'` 
时,此选项是必需的。**                                                                    
                                                                                
                                           |
+| startup.mode                                   | Enum     | 否       | 
INITIAL | SqlServer CDC 消费者的可选启动模式,有效枚举为 
"initial"、"earliest"、"latest"、"timestamp" 和 "specific"。                         
                                                                                
                                                                    |
+| startup.timestamp                              | Long     | 否       | -      
 | 从指定的纪元时间戳(以毫秒为单位)开始。当 `startup.mode = timestamp` 时,该时间戳会按 `server-time-zone` 
转换。<br/> **注意,当 "startup.mode" 选项使用 `'timestamp'` 时,此选项是必需的。**                  
                                                                                
                                                          |
 | startup.specific-offset.file                   | String   | 否       | -      
 | 从指定的 binlog 文件名开始。<br/>**注意,当 "startup.mode" 选项使用 `'specific'` 时,此选项是必需的。**  
                                                                                
                                                                                
                                           |
 | startup.specific-offset.pos                    | Long     | 否       | -      
 | 从指定的 binlog 文件位置开始。<br/>**注意,当 "startup.mode" 选项使用 `'specific'` 时,此选项是必需的。** 
                                                                                
                                                                                
                                          |
 | stop.mode                                      | Enum     | 否       | NEVER  
 | SqlServer CDC 消费者的可选停止模式,有效枚举为 "never"。                                      
                                                                                
                                                                                
                                                      |
@@ -90,7 +90,7 @@ Sql Server CDC 连接器允许从 SqlServer 数据库读取快照数据和增量
 | incremental.parallelism                        | Integer  | 否       | 1      
 | 增量阶段中并行读取器的数量。                                                               
                                                                                
                                                                                
                                                         |
 | snapshot.split.size                            | Integer  | 否       | 8096   
 | 表快照的分割大小(行数),读取表快照时,捕获的表会被分割为多个分割。                                           
                                                                                
                                                                                
                                     |
 | snapshot.fetch.size                            | Integer  | 否       | 1024   
 | 读取表快照时每次轮询的最大获取大小。                                                           
                                                                                
                                                                                
                                                     |
-| server-time-zone                               | String   | 否       | UTC    
 | 数据库服务器中的会话时区。                                                                
                                                                                
                                                                                
                                                          |
+| server-time-zone                               | String   | 否       | UTC    
 | 数据库服务器中的会话时区。该参数也用于将 `startup.timestamp` 转换为 LSN。若数据库时区与 JVM 时区不同,建议显式配置。    
                                                                                
                                                                                
                                                                               |
 | connect.timeout                                | Duration | 否       | 30s    
 | 连接器尝试连接到数据库服务器后在超时之前应该等待的最长时间。                                               
                                                                                
                                                                                
                                         |
 | connect.max-retries                            | Integer  | 否       | 3      
 | 连接器应该重试建立数据库服务器连接的最大重试次数。                                                    
                                                                                
                                                                                
                                              |
 | connection.pool.size                           | Integer  | 否       | 20     
 | 连接池大小。                                                                       
                                                                                
                                                                                
                                                                 |
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceOptions.java
index a00fe6e83f..ec18a4aaee 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceOptions.java
@@ -34,11 +34,14 @@ public class OracleIncrementalSourceOptions extends 
JdbcSourceOptions {
                     Options.key(SourceOptions.STARTUP_MODE_KEY)
                             .singleChoice(
                                     StartupMode.class,
-                                    Arrays.asList(StartupMode.INITIAL, 
StartupMode.LATEST))
+                                    Arrays.asList(
+                                            StartupMode.INITIAL,
+                                            StartupMode.LATEST,
+                                            StartupMode.TIMESTAMP))
                             .defaultValue(StartupMode.INITIAL)
                             .withDescription(
                                     "Optional startup mode for CDC source, 
valid enumerations are "
-                                            + "\"initial\", \"earliest\", 
\"latest\", \"timestamp\"\n or \"specific\"");
+                                            + "\"initial\", \"latest\" or 
\"timestamp\"");
 
     public static final SingleChoiceOption<StopMode> STOP_MODE =
             (SingleChoiceOption)
@@ -47,7 +50,7 @@ public class OracleIncrementalSourceOptions extends 
JdbcSourceOptions {
                             .defaultValue(StopMode.NEVER)
                             .withDescription(
                                     "Optional stop mode for CDC source, valid 
enumerations are "
-                                            + "\"never\", \"latest\", 
\"timestamp\"\n or \"specific\"");
+                                            + "\"never\"");
 
     public static final Option<List<String>> SCHEMA_NAMES =
             Options.key("schema-names")
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/offset/RedoLogOffsetFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/offset/RedoLogOffsetFactory.java
index 2d85a94cac..e0b3a97eae 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/offset/RedoLogOffsetFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/offset/RedoLogOffsetFactory.java
@@ -74,6 +74,11 @@ public class RedoLogOffsetFactory extends OffsetFactory {
 
     @Override
     public Offset timestamp(long timestamp) {
-        throw new UnsupportedOperationException("not supported create new 
Offset by timestamp.");
+        try (JdbcConnection jdbcConnection = 
dialect.openJdbcConnection(sourceConfig)) {
+            return OracleConnectionUtils.timestampToScn(
+                    jdbcConnection, timestamp, 
sourceConfig.getServerTimeZone());
+        } catch (Exception e) {
+            throw new RuntimeException("Convert timestamp to redoLog offset 
error", e);
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleConnectionUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleConnectionUtils.java
index 125dfed314..04db02151d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleConnectionUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleConnectionUtils.java
@@ -33,9 +33,11 @@ import io.debezium.relational.TableId;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
+import java.util.Calendar;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.TimeZone;
 
 import static io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX;
 
@@ -84,6 +86,48 @@ public class OracleConnectionUtils {
         }
     }
 
+    /**
+     * Convert timestamp (milliseconds since epoch) to Oracle SCN.
+     *
+     * @param jdbc JDBC connection
+     * @param timestampMs timestamp in milliseconds since epoch
+     * @param serverTimeZone database server time zone
+     * @return RedoLogOffset with the corresponding SCN
+     */
+    public static RedoLogOffset timestampToScn(
+            JdbcConnection jdbc, long timestampMs, String serverTimeZone) {
+        try {
+            String effectiveServerTimeZone =
+                    serverTimeZone == null ? TimeZone.getDefault().getID() : 
serverTimeZone;
+            LOG.info(
+                    "Converting timestamp {} to SCN with server time zone {}",
+                    timestampMs,
+                    effectiveServerTimeZone);
+            String sql = "SELECT TIMESTAMP_TO_SCN(?) AS SCN FROM DUAL";
+            return jdbc.prepareQueryAndMap(
+                    sql,
+                    statement -> {
+                        java.sql.Timestamp timestamp = new 
java.sql.Timestamp(timestampMs);
+                        Calendar calendar =
+                                
Calendar.getInstance(TimeZone.getTimeZone(effectiveServerTimeZone));
+                        statement.setTimestamp(1, timestamp, calendar);
+                    },
+                    rs -> {
+                        if (rs.next()) {
+                            final String scn = rs.getString(1);
+                            LOG.info("Converted timestamp {} to SCN: {}", 
timestampMs, scn);
+                            return new 
RedoLogOffset(Scn.valueOf(scn).longValue());
+                        } else {
+                            throw new SeaTunnelException(
+                                    "Cannot convert timestamp to SCN. Make 
sure the specified timestamp is valid.");
+                        }
+                    });
+        } catch (SQLException e) {
+            LOG.error("Failed to convert timestamp to SCN", e);
+            throw new SeaTunnelException("Failed to convert timestamp to SCN", 
e);
+        }
+    }
+
     public static List<TableId> listTables(
             JdbcConnection jdbcConnection, String database, 
RelationalTableFilters tableFilters)
             throws SQLException {
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceOptions.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceOptions.java
index 49d1ffb728..888ba18b33 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerIncrementalSourceOptions.java
@@ -34,6 +34,7 @@ public class SqlServerIncrementalSourceOptions extends 
JdbcSourceOptions {
                                     Arrays.asList(
                                             StartupMode.INITIAL,
                                             StartupMode.EARLIEST,
+                                            StartupMode.TIMESTAMP,
                                             StartupMode.LATEST))
                             .defaultValue(StartupMode.INITIAL)
                             .withDescription(
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/offset/LsnOffsetFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/offset/LsnOffsetFactory.java
index 201b7a1c80..f24075ebb8 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/offset/LsnOffsetFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/offset/LsnOffsetFactory.java
@@ -73,6 +73,13 @@ public class LsnOffsetFactory extends OffsetFactory {
 
     @Override
     public Offset timestamp(long timestamp) {
-        throw new UnsupportedOperationException("not supported create new 
Offset by timestamp.");
+        try (JdbcConnection jdbcConnection = 
dialect.openJdbcConnection(sourceConfig)) {
+            return SqlServerUtils.timestampToLsn(
+                    (SqlServerConnection) jdbcConnection,
+                    timestamp,
+                    sourceConfig.getServerTimeZone());
+        } catch (Exception e) {
+            throw new RuntimeException("Convert timestamp to LSN offset 
error", e);
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtils.java
index 2ad9bac901..5867a84c87 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtils.java
@@ -46,13 +46,16 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Calendar;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.TimeZone;
 
 /** The utils for SqlServer data source. */
 @Slf4j
@@ -282,6 +285,83 @@ public class SqlServerUtils {
         }
     }
 
+    /**
+     * Convert timestamp (in milliseconds) to LSN using SQL Server's 
sys.fn_cdc_map_time_to_lsn
+     * function.
+     *
+     * @param connection SQL Server connection
+     * @param timestampMs timestamp in milliseconds
+     * @param serverTimeZone database server time zone
+     * @return LsnOffset corresponding to the timestamp
+     */
+    public static LsnOffset timestampToLsn(
+            SqlServerConnection connection, long timestampMs, String 
serverTimeZone) {
+        try {
+            String effectiveServerTimeZone =
+                    serverTimeZone == null ? TimeZone.getDefault().getID() : 
serverTimeZone;
+            String sql =
+                    "SELECT sys.fn_cdc_map_time_to_lsn('smallest greater than 
or equal', ?) AS lsn";
+
+            return connection.prepareQueryAndMap(
+                    sql,
+                    ps -> {
+                        Timestamp timestamp = new Timestamp(timestampMs);
+                        Calendar calendar =
+                                
Calendar.getInstance(TimeZone.getTimeZone(effectiveServerTimeZone));
+                        ps.setTimestamp(1, timestamp, calendar);
+                    },
+                    rs -> {
+                        if (!rs.next()) {
+                            throw new SQLException(
+                                    String.format(
+                                            "No LSN found for timestamp %d 
(%s)",
+                                            timestampMs, new 
Timestamp(timestampMs)));
+                        }
+                        byte[] lsnBytes = rs.getBytes("lsn");
+                        if (lsnBytes == null) {
+                            throw new SQLException(
+                                    String.format(
+                                            "LSN is null for timestamp %d 
(%s). "
+                                                    + "This may indicate that 
CDC is not enabled or the timestamp is too old.",
+                                            timestampMs, new 
java.sql.Timestamp(timestampMs)));
+                        }
+                        Lsn lsn = Lsn.valueOf(lsnBytes);
+                        log.info(
+                                "Converted timestamp {} ({}) to LSN: {}",
+                                timestampMs,
+                                new Timestamp(timestampMs),
+                                lsn);
+                        return LsnOffset.valueOf(lsn.toString());
+                    });
+        } catch (SQLException e) {
+            throw new SeaTunnelException(
+                    String.format(
+                            "Failed to convert timestamp %d (%s) to LSN: %s",
+                            timestampMs, new Timestamp(timestampMs), 
e.getMessage()),
+                    e);
+        }
+    }
+
+    /**
+     * Convert LSN string to LsnOffset.
+     *
+     * @param lsnString LSN string in format "00000027:00000a80:0003"
+     * @return LsnOffset
+     */
+    public static LsnOffset lsnStringToOffset(String lsnString) {
+        try {
+            // Validate LSN format
+            Lsn.valueOf(lsnString);
+            return LsnOffset.valueOf(lsnString);
+        } catch (Exception e) {
+            throw new SeaTunnelException(
+                    String.format(
+                            "Invalid LSN format: %s. Expected format: 
00000027:00000a80:0003",
+                            lsnString),
+                    e);
+        }
+    }
+
     /** Get split scan query for the given table. */
     public static String buildSplitScanQuery(
             TableId tableId, SeaTunnelRowType rowType, boolean isFirstSplit, 
boolean isLastSplit) {
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtilsTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtilsTest.java
index c10897bfd3..fa0c606c5e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtilsTest.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtilsTest.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.offset.LsnOffset;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -70,4 +71,15 @@ public class SqlServerUtilsTest {
         Assertions.assertEquals(
                 "SELECT * FROM [db1].[schema1].[table1] WHERE [id] >= ?", 
splitScanSQL);
     }
+
+    @Test
+    public void testLsnStringToOffset() {
+        String lsnString = "00000027:00000a80:0003";
+        LsnOffset offset = SqlServerUtils.lsnStringToOffset(lsnString);
+        Assertions.assertEquals(lsnString, offset.getCommitLsn().toString());
+
+        String invalidLsn = "invalid_lsn";
+        Assertions.assertThrows(
+                RuntimeException.class, () -> 
SqlServerUtils.lsnStringToOffset(invalidLsn));
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
index 0f53bc7dc5..adcacd89fa 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java
@@ -41,6 +41,7 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -641,13 +642,69 @@ public class OracleCDCIT extends AbstractOracleCDCIT 
implements TestResource {
         return String.format(SOURCE_SQL_TEMPLATE, database, tableName);
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK},
+            disabledReason = "Currently SPARK do not support cdc")
+    public void testTimestampStartupMode(TestContainer container) throws 
Exception {
+        clearTable(SCEHMA_NAME, SINK_TABLE1);
+        clearTable(SCEHMA_NAME, SOURCE_TABLE1);
+
+        insertRow(1, SCEHMA_NAME, SOURCE_TABLE1);
+
+        // sleep for a while to make sure the timestamp is different
+        TimeUnit.SECONDS.sleep(5);
+        long startTimestamp = System.currentTimeMillis();
+        TimeUnit.SECONDS.sleep(5);
+
+        insertRow(2, SCEHMA_NAME, SOURCE_TABLE1);
+
+        CompletableFuture.supplyAsync(
+                () -> {
+                    try {
+                        container.executeJob(
+                                "/oraclecdc_to_oracle_timestamp.conf",
+                                Arrays.asList("timestamp=" + startTimestamp));
+                    } catch (Exception e) {
+                        log.error("Commit task exception :" + e.getMessage());
+                        throw new RuntimeException(e);
+                    }
+                    return null;
+                });
+
+        await().atMost(300000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            List<List<Object>> sinkRows =
+                                    querySql(
+                                            "SELECT ID FROM "
+                                                    + SCEHMA_NAME
+                                                    + "."
+                                                    + SINK_TABLE1
+                                                    + " ORDER BY ID ASC");
+                            Assertions.assertTrue(
+                                    sinkRows.stream()
+                                            .anyMatch(row -> 
row.get(0).toString().equals("2")));
+                            Assertions.assertFalse(
+                                    sinkRows.stream()
+                                            .anyMatch(row -> 
row.get(0).toString().equals("1")));
+                        });
+    }
+
     private void insertSourceTable(String database, String tableName) {
+        insertRow(1, database, tableName);
+    }
+
+    private void insertRow(int id, String database, String tableName) {
         executeSql(
                 "INSERT INTO "
                         + database
                         + "."
                         + tableName
-                        + " VALUES (1, 'vc2', 'vc2', 'nvc2', 'c', 'nc',1.1, 
2.22, 3.33, 8.888, 4.4444, 5.555, 6.66, 1234.567891, 1234.567891, 77.323,1, 22, 
333, 4444, 5555, 1, 99, 1001, 999999999, 999999999999999999,94, 9949, 
999999994, 999999999999999949, 
99999999999999999999999999999999999949,TO_DATE('2022-10-30', 
'yyyy-mm-dd'),TO_TIMESTAMP('2022-10-30 12:34:56.00789', 'yyyy-mm-dd 
HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.12545', 'yyyy-mm-dd 
HH24:MI:SS.FF5'),TO_TIMESTAMP('2022 [...]
+                        + " VALUES ("
+                        + id
+                        + ", 'vc2', 'vc2', 'nvc2', 'c', 'nc',1.1, 2.22, 3.33, 
8.888, 4.4444, 5.555, 6.66, 1234.567891, 1234.567891, 77.323,1, 22, 333, 4444, 
5555, 1, 99, 1001, 999999999, 999999999999999999,94, 9949, 999999994, 
999999999999999949, 
99999999999999999999999999999999999949,TO_DATE('2022-10-30', 
'yyyy-mm-dd'),TO_TIMESTAMP('2022-10-30 12:34:56.00789', 'yyyy-mm-dd 
HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12:34:56.12545', 'yyyy-mm-dd 
HH24:MI:SS.FF5'),TO_TIMESTAMP('2022-10-30 12: [...]
     }
 
     private void updateSourceTable(String database, String tableName) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_timestamp.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_timestamp.conf
new file mode 100644
index 0000000000..a4daae48fc
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/resources/oraclecdc_to_oracle_timestamp.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  Oracle-CDC {
+    plugin_output = "customers"
+    username = "system"
+    password = "top_secret"
+    database-names = ["ORCLCDB"]
+    schema-names = ["DEBEZIUM"]
+    table-names = ["ORCLCDB.DEBEZIUM.FULL_TYPES"]
+    url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
+    source.reader.close.timeout = 120000
+    connection.pool.size = 1
+    startup.mode = "timestamp"
+    startup.timestamp = ${timestamp}
+    debezium {
+        database.oracle.jdbc.timezoneAsRegion = "false"
+    }
+  }
+}
+
+sink {
+  Jdbc {
+    plugin_input = "customers"
+    driver = "oracle.jdbc.driver.OracleDriver"
+    url = "jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
+    username = "system"
+    password = "top_secret"
+    generate_sink_sql = true
+    database = "ORCLCDB"
+    table = "DEBEZIUM.SINK_FULL_TYPES"
+    batch_size = 1
+    primary_keys = ["ID"]
+    connection.pool.size = 1
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index db3e750186..b70f60843b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -718,4 +718,60 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
                                     querySql(selectSql, sinkTable));
                         });
     }
+
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK},
+            disabledReason = "Currently SPARK do not support cdc")
+    public void testTimestampStartupMode(TestContainer container) throws 
InterruptedException {
+        initializeSqlServerTable(DATABASE_NAME);
+        executeSql("TRUNCATE TABLE " + DATABASE_NAME + "." + SCHEMA_NAME + 
".full_types_sink;");
+
+        // Use full fields insert to avoid implicit conversion error for 
varbinary columns with null
+        // value
+        executeSql(
+                "INSERT INTO "
+                        + SOURCE_TABLE_CUSTOM_PRIMARY_KEY
+                        + " VALUES (1, 'cč1', 'vcč', 'tč', N'cč', N'vcč', 
N'tč', 1.123, 2, 3.323, 4.323, 5.323, 6.323, 1, 22, 333, 4444, 55555, 
'2018-07-13', '10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', 
'2018-07-13 14:23:45', '<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS 
varbinary(100)), 5.32)");
+
+        // sleep for a while to make sure the timestamp is different
+        TimeUnit.SECONDS.sleep(5);
+        long startTimestamp = System.currentTimeMillis();
+        TimeUnit.SECONDS.sleep(5);
+
+        executeSql(
+                "INSERT INTO "
+                        + SOURCE_TABLE_CUSTOM_PRIMARY_KEY
+                        + " VALUES (2, 'cč2', 'vcč', 'tč', N'cč', N'vcč', 
N'tč', 1.123, 2, 3.323, 4.323, 5.323, 6.323, 1, 22, 333, 4444, 55555, 
'2018-07-13', '10:23:45', '2018-07-13 11:23:45.34', '2018-07-13 13:23:45.78', 
'2018-07-13 14:23:45', '<a>b</a>',SYSDATETIMEOFFSET(),CAST('test_varbinary' AS 
varbinary(100)), 5.32)");
+
+        CompletableFuture.runAsync(
+                () -> {
+                    try {
+                        container.executeJob(
+                                "/sqlservercdc_to_sqlserver_timestamp.conf",
+                                Arrays.asList("timestamp=" + startTimestamp));
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        await().atMost(300000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            List<List<Object>> sinkRows =
+                                    querySql(
+                                            "SELECT id FROM "
+                                                    + DATABASE_NAME
+                                                    + "."
+                                                    + SCHEMA_NAME
+                                                    + ".full_types_sink ORDER 
BY id ASC");
+                            Assertions.assertTrue(
+                                    sinkRows.stream()
+                                            .anyMatch(row -> 
row.get(0).toString().equals("2")));
+                            Assertions.assertFalse(
+                                    sinkRows.stream()
+                                            .anyMatch(row -> 
row.get(0).toString().equals("1")));
+                        });
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_timestamp.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_timestamp.conf
new file mode 100644
index 0000000000..f0e178ebfb
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/resources/sqlservercdc_to_sqlserver_timestamp.conf
@@ -0,0 +1,57 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
+
+source {
+  SqlServer-CDC {
+    plugin_output = "customers"
+    username = "sa"
+    password = "Password!"
+    database-names = ["column_type_test"]
+    table-names = ["column_type_test.dbo.full_types_custom_primary_key"]
+    url = "jdbc:sqlserver://sqlserver-host:1433;databaseName=column_type_test"
+    startup.mode = "timestamp"
+    startup.timestamp = ${timestamp}
+    exactly_once = true
+    table-names-config = [
+      {
+        table = "column_type_test.dbo.full_types_custom_primary_key"
+        primaryKeys = ["id"]
+      }
+    ]
+  }
+}
+
+sink {
+  Jdbc {
+    plugin_input = "customers"
+    driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
+    url = "jdbc:sqlserver://sqlserver-host:1433;encrypt=false"
+    user = "sa"
+    password = "Password!"
+    generate_sink_sql = true
+    database = "column_type_test"
+    table = "dbo.full_types_sink"
+    batch_size = 1
+    primary_keys = ["id"]
+  }
+}


Reply via email to