platinumhamburg opened a new issue, #2102:
URL: https://github.com/apache/fluss/issues/2102

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/fluss/issues) and 
found nothing similar.
   
   
   ### Motivation
   
   Currently, when updating data in primary key tables, the changelog produces 
both `UPDATE_BEFORE` (-U) and `UPDATE_AFTER` (+U) records for each update 
operation. In many scenarios, downstream consumers do not require the previous 
value information, yet they still incur the cost of storing and transmitting 
both records.
   
   This feature introduces a new table-level configuration option 
`table.changelog.ignore-update-before` that allows users to selectively ignore 
`UPDATE_BEFORE` records, providing the following benefits:
   
   - **Reduced storage costs**: Approximately 50% reduction in changelog 
storage space for update-heavy workloads
   - **Lower transmission overhead**: Decreased CDC data transfer costs between 
systems
   - **Improved write performance**: For full-row update scenarios (using 
`DefaultRowMerger`), the system can skip the RocksDB lookup for the old value 
entirely
   
   ### Solution
   
   ### 1. New Configuration Option
   
   Add a new configuration option in `ConfigOptions`:
   
   ```java
   public static final ConfigOption<Boolean> 
TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE =
           key("table.changelog.ignore-update-before")
                   .booleanType()
                   .defaultValue(false)
                   .withDescription(
                           "Whether to ignore UPDATE_BEFORE records in 
changelog for the primary key table. "
                                   + "When disabled (default), update 
operations produce both UPDATE_BEFORE and UPDATE_AFTER records. "
                                   + "When enabled, update operations only 
produce UPDATE_AFTER records, "
                                   + "which reduces storage and transmission 
costs but loses the ability to track previous values. "
                                   + "This option only affects primary key 
tables.");
   ```
   
   Provide accessor method in `TableConfig`:
   
   ```java
   public boolean isChangelogIgnoreUpdateBefore() {
       return config.get(ConfigOptions.TABLE_CHANGELOG_IGNORE_UPDATE_BEFORE);
   }
   ```
   
   ### 2. KvTablet Core Logic
   
   Modify the `KvTablet.putAsLeader()` method to handle the new configuration:
   
   | Scenario | `ignoreUpdateBefore=false` (default) | 
`ignoreUpdateBefore=true` |
   
|----------|--------------------------------------|---------------------------|
   | Insert | Produce `INSERT` (+I) | Produce `UPDATE_AFTER` (+U)* |
   | Full Update (DefaultRowMerger) | Query old value, produce `-U`, `+U` | 
**Skip old value query**, produce `+U` only |
   | Partial Update | Query old value, merge, produce `-U`, `+U` | Query old 
value, merge, produce `+U` only |
   | Delete | Produce `DELETE` (-D) | Produce `DELETE` (-D) (unchanged) |
   
   *Note: When using `DefaultRowMerger` with `ignoreUpdateBefore=true`, inserts 
are treated as updates for simplicity since the old value lookup is skipped.
   
   ### 3. Flink Source Adaptation
   
   Update `FlinkTableSource.getChangelogMode()` to return the appropriate 
`ChangelogMode` based on the configuration:
   
   - When `ignoreUpdateBefore=true` and `deleteBehavior=ALLOW`:
     - Returns: `INSERT`, `UPDATE_AFTER`, `DELETE`
   - When `ignoreUpdateBefore=true` and `deleteBehavior!=ALLOW`:
     - Returns: `INSERT`, `UPDATE_AFTER`
   - When `ignoreUpdateBefore=false` (default):
     - Original behavior preserved
   
   ### Anything else?
   
   _No response_
   
   ### Willingness to contribute
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to