platinumhamburg opened a new issue, #2102: URL: https://github.com/apache/fluss/issues/2102
### Search before asking - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and found nothing similar. ### Motivation Currently, when updating data in primary key tables, the changelog produces both `UPDATE_BEFORE` (-U) and `UPDATE_AFTER` (+U) records for each update operation. In many scenarios, downstream consumers do not require the previous value information, yet they still incur the cost of storing and transmitting both records. This feature introduces a new table-level configuration option `table.changelog.ignore-update-before` that allows users to selectively ignore `UPDATE_BEFORE` records, providing the following benefits: - **Reduced storage costs**: Approximately 50% reduction in changelog storage space for update-heavy workloads - **Lower transmission overhead**: Decreased CDC data transfer costs between systems - **Improved write performance**: For full-row update scenarios (using `DefaultRowMerger`), the system can skip the RocksDB lookup for the old value entirely ### Solution ### 1. New Configuration Option Add a new configuration option in `ConfigOptions`: ```java public static final ConfigOption<Boolean> TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE = key("table.changelog.ignore-update-before") .booleanType() .defaultValue(false) .withDescription( "Whether to ignore UPDATE_BEFORE records in changelog for the primary key table. " + "When disabled (default), update operations produce both UPDATE_BEFORE and UPDATE_AFTER records. " + "When enabled, update operations only produce UPDATE_AFTER records, " + "which reduces storage and transmission costs but loses the ability to track previous values. " + "This option only affects primary key tables."); ``` Provide accessor method in `TableConfig`: ```java public boolean isChangelogIgnoreUpdateBefore() { return config.get(ConfigOptions.TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE); } ``` ### 2. KvTablet Core Logic Modify the `KvTablet.putAsLeader()` method to handle the new configuration: | Scenario | `ignoreUpdateBefore=false` (default) | `ignoreUpdateBefore=true` | |----------|--------------------------------------|---------------------------| | Insert | Produce `INSERT` (+I) | Produce `UPDATE_AFTER` (+U)* | | Full Update (DefaultRowMerger) | Query old value, produce `-U`, `+U` | **Skip old value query**, produce `+U` only | | Partial Update | Query old value, merge, produce `-U`, `+U` | Query old value, merge, produce `+U` only | | Delete | Produce `DELETE` (-D) | Produce `DELETE` (-D) (unchanged) | *Note: When using `DefaultRowMerger` with `ignoreUpdateBefore=true`, inserts are treated as updates for simplicity since the old value lookup is skipped. ### 3. Flink Source Adaptation Update `FlinkTableSource.getChangelogMode()` to return the appropriate `ChangelogMode` based on the configuration: - When `ignoreUpdateBefore=true` and `deleteBehavior=ALLOW`: - Returns: `INSERT`, `UPDATE_AFTER`, `DELETE` - When `ignoreUpdateBefore=true` and `deleteBehavior!=ALLOW`: - Returns: `INSERT`, `UPDATE_AFTER` - When `ignoreUpdateBefore=false` (default): - Original behavior preserved ### Anything else? _No response_ ### Willingness to contribute - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
