LeonYoah commented on PR #10428:
URL: https://github.com/apache/seatunnel/pull/10428#issuecomment-3835979833
> ## Issue 1: Inconsistent timezone handling causes timestamp conversion
errors
> **Location**:
>
> *
`seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleConnectionUtils.java:101-102`
> *
`seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtils.java:302-303`
>
> **Related Context**:
>
> * Base class: `StartupConfig.java:37` - `private final Long timestamp;`
> * Caller: `StartupConfig.java:50` - `return
offsetFactory.timestamp(timestamp);`
> * Config definition: `SourceOptions.java:53-58` - `STARTUP_TIMESTAMP`
described as "timestamp(mills)"
>
> **Problem Description**:
>
> ```java
> // Current implementation
> java.sql.Timestamp timestamp = new java.sql.Timestamp(timestampMs);
> statement.setTimestamp(1, timestamp); // Use JVM default timezone
> ```
>
> 该 `startup.timestamp` configuration explicitly states "milliseconds since
Unix epoch" (UTC time), but the code uses `java.sql.Timestamp` 和
`PreparedStatement.setTimestamp()` which rely on the JVM default timezone. If
the server timezone is not UTC, this leads to:
>
> * Users expect to start from UTC 2024-01-01 00:00:00
> * On Beijing servers (UTC+8), it actually starts from 2024-01-01 08:00:00
> * Causing data loss or incorrect starting points
>
> **Potential Risks**:
>
> * **Risk 1**: In cross-timezone deployment scenarios, user-configured
timestamps don't match actual start times, leading to data loss
> * **Risk 2**: In containerized/cloud environments, node timezones may be
uncontrollable, increasing deployment risks
> * **Risk 3**: Users in different regions may understand timestamps
differently, potentially causing confusion
>
> **Impact Scope**:
>
> * **Direct Impact**:
>
> * `OracleConnectionUtils.timestampToScn()`
> * `SqlServerUtils.timestampToLsn()`
> * **Indirect Impact**: All Oracle/SQL Server CDC tasks using `startup.mode
= "timestamp"`
> * **Affected Area**: Single Connector (Oracle, SQL Server), does not
affect other connectors
>
> **Severity**: **MAJOR** (may cause data loss and user confusion)
>
> **Improvement Suggestions**:
>
> ```java
> // Oracle implementation
> public static RedoLogOffset timestampToScn(JdbcConnection jdbc, long
timestampMs) {
> try {
> LOG.info("Converting timestamp {} to SCN", timestampMs);
> String sql = "SELECT TIMESTAMP_TO_SCN(?) AS SCN FROM DUAL";
> return jdbc.prepareQueryAndMap(
> sql,
> statement -> {
> // Solution 1: Use Calendar to explicitly specify UTC
timezone
> java.util.Calendar utcCalendar =
java.util.Calendar.getInstance(java.util.TimeZone.getTimeZone("UTC"));
> java.sql.Timestamp timestamp = new
java.sql.Timestamp(timestampMs);
> statement.setTimestamp(1, timestamp, utcCalendar);
> },
> rs -> {
> if (rs.next()) {
> final String scn = rs.getString(1);
> LOG.info("Converted timestamp {} to SCN: {}",
timestampMs, scn);
> return new
RedoLogOffset(Scn.valueOf(scn).longValue());
> } else {
> throw new SeaTunnelException(
> "Cannot convert timestamp to SCN. Make
sure the specified timestamp is valid.");
> }
> });
> } catch (SQLException e) {
> LOG.error("Failed to convert timestamp to SCN", e);
> throw new SeaTunnelException("Failed to convert timestamp to SCN",
e);
> }
> }
>
> // SQL Server implementation similar modification
> ```
>
> **Rationale**:
>
> * Explicitly use UTC timezone to stay consistent with config item
documentation ("milliseconds since Unix epoch")
> * Avoid reliance on JVM default timezone, improving portability
> * Conforms to standard Unix timestamp semantics (starting from UTC
1970-01-01 00:00:00)
> * Need to modify both connectors synchronously to ensure consistent
behavior
>
> ## Issue 2: SQL Server CDC function parameter semantics may cause data loss
> **Location**:
>
> *
`seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/utils/SqlServerUtils.java:297`
>
> **Related Context**:
>
> * Caller: `LsnOffsetFactory.java:77` - `return
SqlServerUtils.timestampToLsn((SqlServerConnection) jdbcConnection, timestamp);`
> * Documentation: SQL Server CDC documentation explains
`sys.fn_cdc_map_time_to_lsn` parameter options
>
> **Problem Description**:
>
> ```java
> String sql = "SELECT sys.fn_cdc_map_time_to_lsn('smallest greater than or
equal', ?) AS lsn";
> ```
>
> `'smallest greater than or equal'` means returning the smallest LSN that
is **greater than or equal to** the given time. This means:
>
> * If user specifies timestamp as `T`, CDC may start from `T' > T`
> * If there are data changes in the `[T, T')` interval, this data will be
lost
> * 5月be inconsistent with Oracle's `TIMESTAMP_TO_SCN` semantics
>
> **Potential Risks**:
>
> * **Risk 1**: Data after the user-specified time point may be missed
> * **Risk 2**: Does not match user expectations ("start from this time
point")
> * **Risk 3**: Inconsistent with other databases (Oracle, MySQL) behavior,
increasing user cognitive burden
>
> **Impact Scope**:
>
> * **Direct Impact**: `SqlServerUtils.timestampToLsn()`
> * **Indirect Impact**: All SQL Server CDC tasks using `startup.mode =
"timestamp"`
> * **Affected Area**: Single Connector (SQL Server)
>
> **Severity**: **MAJOR** (may cause data loss)
>
> **Improvement Suggestions**:
>
> ```java
> // Solution 1: Use 'largest less than or equal' (recommended)
> String sql = "SELECT sys.fn_cdc_map_time_to_lsn('largest less than or
equal', ?) AS lsn";
>
> // Solution 2: Clearly document behavioral differences
> // Add explanation in SqlServer-CDC.md:
> // "Note: The timestamp startup mode uses the 'smallest greater than or
equal'
> // strategy, which may start from a time slightly later than the specified
> // timestamp to ensure data integrity."
> ```
>
> **Rationale**:
>
> * 使用 `'largest less than or equal'` to ensure no data is lost at the
user-specified time point
> * If current option must be used, needs explicit documentation to set
clear user expectations
> * Considering data integrity, recommend modifying to `'largest less than
or equal'`
>
> ## Issue 3: Exception handling uses RuntimeException instead of
SeaTunnelException
> **Location**:
>
> *
`seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/offset/RedoLogOffsetFactory.java:79-81`
> *
`seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/offset/LsnOffsetFactory.java:76-78`
>
> **Related Context**:
>
> * Similar methods: `RedoLogOffsetFactory.latest()` - uses
`RuntimeException("Read the redoLog offset error", e)`
> * Utility methods: `OracleConnectionUtils.timestampToScn()` - uses
`SeaTunnelException("Failed to convert timestamp to SCN", e)`
>
> **Problem Description**:
>
> ```java
> // OffsetFactory layer
> catch (Exception e) {
> throw new RuntimeException("Convert timestamp to redoLog offset
error", e);
> }
>
> // Utility class layer
> catch (SQLException e) {
> throw new SeaTunnelException("Failed to convert timestamp to SCN", e);
> }
> ```
>
> Inconsistent exception handling hierarchy:
>
> * `OffsetFactory` layer throws `RuntimeException`
> * `OracleConnectionUtils` layer throws `SeaTunnelException`
>
> **Potential Risks**:
>
> * **Risk 1**: Upper layer calling code needs to handle `RuntimeException`
和 `SeaTunnelException` separately
> * **Risk 2**: Inconsistent error handling logic may cause some exceptions
not to be caught correctly
> * **Risk 3**: Inconsistent with exception types of other CDC connectors
(MySQL)
>
> **Impact Scope**:
>
> * **Direct Impact**:
>
> * `RedoLogOffsetFactory.timestamp()`
> * `LsnOffsetFactory.timestamp()`
> * **Indirect Impact**: Upper layer code calling `OffsetFactory.timestamp()`
> * **Affected Area**: Oracle and SQL Server CDC connectors
>
> **Severity**: **MINOR** (does not affect functionality, but affects code
consistency and maintainability)
>
> **Improvement Suggestions**:
>
> ```java
> // Oracle implementation
> @Override
> public Offset timestamp(long timestamp) {
> try (JdbcConnection jdbcConnection =
dialect.openJdbcConnection(sourceConfig)) {
> return OracleConnectionUtils.timestampToScn(jdbcConnection,
timestamp);
> } catch (Exception e) {
> throw new SeaTunnelException("Convert timestamp to redoLog offset
error", e);
> }
> }
>
> // SQL Server implementation
> @Override
> public Offset timestamp(long timestamp) {
> try (JdbcConnection jdbcConnection =
dialect.openJdbcConnection(sourceConfig)) {
> return SqlServerUtils.timestampToLsn((SqlServerConnection)
jdbcConnection, timestamp);
> } catch (Exception e) {
> throw new SeaTunnelException("Convert timestamp to LSN offset
error", e);
> }
> }
> ```
>
> **Rationale**:
>
> * Unify using `SeaTunnelException` to stay consistent with other layers
> * Facilitates unified exception handling in upper layer code
> * Maintains consistency with utility class layer exception types
> * Need to synchronously check other `OffsetFactory` methods (such as
`earliest()`, `latest()`) for similar issues
>
> ## Issue 4: Documentation description inconsistent with actually supported
startup modes
> **Location**:
>
> *
`seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSourceOptions.java:44-46`
>
> **Related Context**:
>
> * Enums supported in code: `StartupMode.INITIAL, StartupMode.LATEST,
StartupMode.TIMESTAMP`
> * Documentation description: `"initial", "latest" or "timestamp"`
> * Also actually supports: `earliest` 和 `specific` (inherited from parent
class or referenced from other methods)
>
> **Problem Description**:
>
> ```java
> .withDescription(
> "Optional startup mode for CDC source, valid enumerations are "
> + "\"initial\", \"latest\" or \"timestamp\"");
> ```
>
> The description text only lists `"initial"`, `"latest"`, `"timestamp"`,
but Oracle CDC actually also supports:
>
> * `earliest`: Start from earliest offset
> * `specific`: Start from specified offset (requires
`startup.specific-offset.file` 和 `startup.specific-offset.pos`)
>
> This may lead users to mistakenly believe these modes are not supported.
>
> **Potential Risks**:
>
> * **Risk 1**: Users don't know they can use `earliest` 或 `specific` modes
> * **Risk 2**: Code and documentation inconsistency increases maintenance
costs
> * **Risk 3**:5月lead to false bug reports (users think functionality is
missing)
>
> **Impact Scope**:
>
> * **Direct Impact**: JavaDoc of
`OracleIncrementalSourceOptions.STARTUP_MODE`
> * **Indirect Impact**: Users viewing code or generated API documentation
> * **Affected Area**: Oracle CDC Connector
>
> **Severity**: **MINOR** (does not affect functionality, but affects user
experience)
>
> **Improvement Suggestions**:
>
> ```java
> // Restore complete enum list (consistent with dev branch)
> .withDescription(
> "Optional startup mode for CDC source, valid enumerations are "
> + "\"initial\", \"earliest\", \"latest\", \"timestamp\" or
\"specific\"");
> ```
>
> **Rationale**:
>
> * Stay consistent with actually supported functionality
> * Stay consistent with original dev branch description (except for newly
added `timestamp`)
> * Avoid misleading users
>
> ## Issue 5: E2E test time granularity insufficient may cause instability
> **Location**:
>
> *
`seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java:655-668`
> *
`seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java:736-749`
>
> **Related Context**:
>
> * Test method: `testTimestampStartupMode()`
> * Test scenario: Verify that when starting from a specified timestamp,
only data after the timestamp is read
>
> **Problem Description**:
>
> ```java
> insertRow(1, SCEHMA_NAME, SOURCE_TABLE1); // Insert ID=1
> TimeUnit.SECONDS.sleep(5);
> long startTimestamp = System.currentTimeMillis(); // Record timestamp
> TimeUnit.SECONDS.sleep(5);
> insertRow(2, SCEHMA_NAME, SOURCE_TABLE1); // Insert ID=2
> ```
>
> Test design has the following issues:
>
> 1. **Insufficient time granularity**: 5-second sleep may not be enough to
ensure timestamp distinction (especially in high-load test environments)
> 2. **Race condition**: `System.currentTimeMillis()` may be called during
`insertRow(2)` execution, causing inaccurate timestamps
> 3. **Untested boundaries**: Does not verify boundary cases (timestamp
earlier than earliest available SCN/LSN, later than latest SCN/LSN)
> 4. **Timezone not verified**: Test does not explicitly verify timezone
conversion correctness
>
> **Potential Risks**:
>
> * **Risk 1**: In high-load CI environments, tests may fail due to
inaccurate timestamps (false negatives)
> * **Risk 2**: Boundary cases not covered, untested errors may occur in
production
> * **Risk 3**: Timezone-related bugs may not be detected
>
> **Impact Scope**:
>
> * **Direct Impact**: `OracleCDCIT.testTimestampStartupMode()`,
`SqlServerCDCIT.testTimestampStartupMode()`
> * **Indirect Impact**: CI/CD process stability
> * **Affected Area**: E2E test suite
>
> **Severity**: **MINOR** (test quality issue, does not affect production
code functionality)
>
> **Improvement Suggestions**:
>
> ```java
> @TestTemplate
> public void testTimestampStartupMode(TestContainer container) throws
Exception {
> clearTable(SCEHMA_NAME, SINK_TABLE1);
> clearTable(SCEHMA_NAME, SOURCE_TABLE1);
>
> // Insert first batch of data
> insertRow(1, SCEHMA_NAME, SOURCE_TABLE1);
>
> // Ensure timestamp distinction
> TimeUnit.SECONDS.sleep(10);
>
> // Use database time instead of JVM time
> long startTimestamp = getCurrentDatabaseTimestamp(); // Add helper
method
> TimeUnit.SECONDS.sleep(10);
>
> // Insert second batch of data
> insertRow(2, SCEHMA_NAME, SOURCE_TABLE1);
>
> // Start CDC task...
>
> await().atMost(300000, TimeUnit.MILLISECONDS)
> .untilAsserted(() -> {
> // Verify logic...
> });
> }
>
> // Helper method: Get current database timestamp
> private long getCurrentDatabaseTimestamp() throws Exception {
> // Execute SQL: SELECT CURRENT_TIMESTAMP FROM DUAL (Oracle)
> // Or: SELECT SYSDATETIME() (SQL Server)
> // Return millisecond timestamp
> }
> ```
>
> **Rationale**:
>
> * Using database timestamps is more accurate, avoiding JVM timezone issues
> * Increase sleep time to 10 秒之前 to improve test stability
> * Recommend adding boundary case tests (early/late timestamps)
>
> ## Issue 6: Oracle Chinese documentation contains large unrelated changes
> **Location**:
>
> * `docs/zh/connectors/source/Oracle-CDC.md` (383 lines of changes)
>
> **Related Context**:
>
> * PR main functionality: Add timestamp startup mode support
> * Documentation changes: Besides timestamp-related content, also
supplements a large amount of Oracle CDC setup steps, CDB/PDB mode descriptions
>
> **Problem Description**:
>
> Oracle Chinese documentation changes (383 lines) include extensive content
unrelated to timestamp functionality:
>
> * Supplemented complete Oracle Logminer enablement steps
> * Added detailed setup instructions for CDB + PDB mode
> * Added data type mapping table
> * Added detailed source option descriptions
>
> Although these content improve documentation quality:
>
> 1. **Exceeds this PR scope**: Should be split into independent
documentation improvement PR
> 2. **Increases Review difficulty**: Large documentation changes make Code
Review more difficult
> 3. **Mixed responsibilities**: Feature development and documentation
improvement should be separated
>
> **Potential Risks**:
>
> * **Risk 1**: Documentation changes may mask code issues
> * **Risk 2**: If PR is rejected, documentation improvements will also be
reverted
> * **Risk 3**: Increases possibility of merge conflicts (dev branch may
have other documentation changes)
>
> **Impact Scope**:
>
> * **Direct Impact**: `docs/zh/connectors/source/Oracle-CDC.md`
> * **Indirect Impact**: Review efficiency and PR merge process
> * **Affected Area**: Documentation
>
> **Severity**: **MINOR** (process issue, does not affect functionality)
>
> **Improvement Suggestions**:
>
> ```
> 方案1(推荐):拆分 PR
> - PR #10428(当前 PR):只保留 timestamp 功能相关的文档变更
> - 新 PR #XXXXX:Oracle CDC 文档改进(包含 Logminer 设置、CDB/PDB 说明等)
>
> 方案2:接受当前 PR
> - 确认文档内容正确后可以接受
> - 建议未来的 PR 遵循"一个 PR 一个职责"的原则
> ```
>
> **Rationale**:
>
> * Follow single responsibility principle for easier review and testing
> * Reduce merge risk
> * If code needs rollback, documentation improvements won't be affected
>
> ### VI. Overall Assessment Conclusion
> #### Can it be Merged
> **Recommendation: Conditionally Approved**
>
> **Rationale**:
>
> 1. **Functional Completeness**: ✅ Core functionality implemented
correctly, solves real user pain points
> 2. **Code Quality**: ⚠️ Has 2 MAJOR level issues (timezone handling, SQL
Server semantics)
> 3. **Test Coverage**: ⚠️ E2E tests have stability issues, insufficient
unit tests
> 4. **Documentation Quality**: ⚠️ Documentation changes too large, exceeds
this PR scope
>
> **Recommended Pre-merge Conditions**:
>
> **Must Fix (BLOCKER)**:
>
> * ✅ **Issue 1**: Timezone handling (fixed via latest commit "Fix timezone
issue", but needs confirmation)
>
> **Strongly Recommended to Fix (MAJOR)**:
>
> * ⚠️ **Issue 2**: SQL Server CDC function parameter semantics
> * ⚠️ **Issue 3**: Exception type consistency
>
> **Recommended to Fix (MINOR)**:
>
> * **Issue 4**: Documentation description consistency
> * **Issue 5**: E2E test improvements
> * **Issue 6**: Documentation split (optional, but recommended)
>
> #### Improvement Priority
> **High Priority**:
>
> 1. Confirm timezone fix is effective (verify latest commit)
> 2. Modify SQL Server to `'largest less than or equal'` or clarify in
documentation
> 3. Unify exception types to `SeaTunnelException`
>
> **Medium Priority**: 4. Fix documentation description inconsistency 5.
Improve E2E tests (use database timestamps)
>
> **Low Priority**: 6. Add Oracle unit tests 7. Add boundary case tests 8.
Supplement Metrics
>
> #### Architecture Rationality Assessment
> **Overall Rating**: ⭐⭐⭐⭐ (4/5)
>
> **Strengths**:
>
> * ✅ Reuses existing infrastructure, conforms to architectural design
> * ✅ Consistent with other CDC connector implementation patterns
> * ✅ Good backward compatibility
> * ✅ High code readability and maintainability
>
> **Room for Improvement**:
>
> * ⚠️ Timezone handling needs clear specifications
> * ⚠️ Timestamp semantic differences across databases need documentation
> * ⚠️ PostgreSQL support missing (needs architecture-level discussion)
>
> #### Long-term Recommendations
> 1. **Establish CDC timestamp specification**:
>
> * Clarify timestamp startup mode behavior for all CDC connectors
> * Unify timezone handling strategy
> * Document timestamp precision for each database
> 2. **Improve PostgreSQL support**:
>
> * Evaluate whether timestamp startup mode can be supported through
other means
> * Such as approximate lookup or time range queries
> 3. **Enhance test coverage**:
>
> * Establish standardized timestamp startup mode test suite
> * Add cross-timezone tests
> * Add boundary case tests
## Response to Issue 2: SQL Server CDC `sys.fn_cdc_map_time_to_lsn`
Parameter Selection
Thank you for the detailed analysis. After careful review, I believe **the
current implementation using `'smallest greater than or equal'` is correct and
should not be changed**. Here's my reasoning:
### 1. Semantic Correctness
The `startup.timestamp` option semantically means **"start capturing changes
from this time point"**, not "include changes before this time point".
- When a user specifies `startup.timestamp = T`, they expect to receive
changes that occurred **at or after** time T
- `'smallest greater than or equal'` returns the first LSN where timestamp
>= T, which exactly matches this expectation
### 2. No Data Loss Occurs
The concern about data loss in the `[T, T')` interval is based on a
misunderstanding:
- `'smallest greater than or equal'` returns the **minimum LSN that is >= T**
- If there's a change exactly at time T, that LSN is returned
- If there's no change at exactly time T, the first change after T is
returned
- There is no gap - T' **is** the timestamp of the first LSN >= T
### 3. Consistency with MySQL CDC Behavior
MySQL CDC's `findBinlogOffsetBytimestamp` uses the same approach - it finds
the **first binlog position >= the specified timestamp** using binary search.
The current SQL Server implementation is consistent with this behavior.
### 4. Using `'largest less than or equal'` Would Cause Issues
If we change to `'largest less than or equal'`:
- User specifies time T, but receives an LSN from **before** T
- This would cause **duplicate data** to be read (data that was already
processed before T)
- This is the actual semantic inconsistency we should avoid
### Recommendation
The current implementation is correct. However, we can improve the
documentation to clarify the behavior:
```markdown
startup.timestamp: Start capturing changes from the specified timestamp (in
milliseconds).
Note: The actual starting position is the first change point at or after the
specified timestamp.
```
If there's a genuine use case for including changes before the specified
timestamp, we could consider adding an optional configuration like
`startup.timestamp.inclusive` in the future, but this should not be the default
behavior.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]