nzw921rx commented on issue #10606:
URL: https://github.com/apache/seatunnel/issues/10606#issuecomment-4072191368
Thanks for the great questions! Here's my proposed design for each:
### 1. Version field behavior
The version field should be **included in both SET and WHERE**, but **NOT
auto-incremented**.
In CDC scenarios, the version value comes from the **source database**. The
source is authoritative. So the generated SQL should look like:
```sql
-- Current behavior (without optimistic locking)
UPDATE table SET col1 = :col1, version = :version WHERE pk = :pk
-- With optimistic.locking.field = "version"
UPDATE table SET col1 = :col1, version = :version WHERE pk = :pk AND version
< :version
```
The key difference is adding `AND version < :version` to the WHERE clause.
This ensures that only **newer** data (with a higher version) can overwrite
existing rows.
If the field is a timestamp type (e.g., `update_time`), the same logic
applies — only update when the incoming timestamp is later than the existing
one.
No auto-increment is needed because:
- In CDC mode, the source row already carries the correct version value
- Auto-increment would conflict with the source-of-truth principle
### 2. Conflict resolution (0 rows affected)
When UPDATE affects **0 rows**, the behavior should be **log warning +
skip** by default.
Reasoning:
- 0 rows affected means the target already has a **newer version** than the
incoming record
- This is **expected behavior** in multi-parallelism CDC, not an error
- Retrying would be pointless (the same stale data would still fail)
- Throwing an exception would kill the job unnecessarily
Additionally, I suggest adding an optional config to control this:
| configKey | Type | Default | Description |
| --- | --- | --- | --- |
| `optimistic.locking.on-conflict` | Enum | `LOG_AND_SKIP` | Behavior when
optimistic lock check fails. Options: `LOG_AND_SKIP`, `THROW_EXCEPTION` |
Default `LOG_AND_SKIP` covers the common case. `THROW_EXCEPTION` is
available for users who want strict consistency and prefer to fail fast.
### 3. Configuration constraints
**Yes, it should require `enable_upsert = true`** (as the original proposal
stated).
Reasoning:
- Optimistic locking only makes sense for UPDATE operations
- In INSERT-only mode, there's no existing row to compare versions against
- The version check is part of the UPDATE WHERE clause
When `optimistic.locking.field` is set, the implementation should **bypass
native upsert** (e.g., MySQL's `INSERT ON DUPLICATE KEY UPDATE`) and use the
**query-based insert-or-update path** (`InsertOrUpdateByQueryExecutor`). This
is because:
- Native upsert SQL (e.g., `ON DUPLICATE KEY UPDATE`) has no WHERE clause on
the update part — we can't add version comparison
- The query-based path (`getUpdateStatement`) already has a WHERE clause
that we can extend
### Implementation Summary
**Changes to `JdbcDialect.getUpdateStatement()`:**
```java
default String getUpdateStatement(
String database, String tableName, String[] fieldNames,
String[] conditionFields, boolean isPrimaryKeyUpdated,
String optimisticLockField) { // new parameter
// ... existing SET/WHERE logic ...
String conditionClause = Arrays.stream(conditionFields)
.map(f -> format("%s = :%s", quoteIdentifier(f), f))
.collect(Collectors.joining(" AND "));
// Append optimistic lock condition
if (optimisticLockField != null) {
conditionClause += format(" AND %s < :%s",
quoteIdentifier(optimisticLockField), optimisticLockField);
}
return format("UPDATE %s SET %s WHERE %s",
tableIdentifier(database, tableName), setClause,
conditionClause);
}
```
**Changes to `InsertOrUpdateBatchStatementExecutor`:**
Add affected-row check after `executeUpdate()`. If 0 rows affected and
optimistic locking is enabled, handle according to `on-conflict` policy.
**Files to modify:**
1. `JdbcSinkOptions` — add `OPTIMISTIC_LOCKING_FIELD` and
`OPTIMISTIC_LOCKING_ON_CONFLICT`
2. `JdbcSinkConfig` — read new options
3. `JdbcDialect.getUpdateStatement()` — accept and apply optimistic lock
field
4. `JdbcOutputFormatBuilder` — pass optimistic lock config to executor,
force query-based path when enabled
5. `InsertOrUpdateBatchStatementExecutor` — handle 0-row-affected scenario
6. Docs (en & zh)
7. Tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]