alexeykudinkin commented on code in PR #6256:
URL: https://github.com/apache/hudi/pull/6256#discussion_r971371531


##########
rfc/rfc-51/rfc-51.md:
##########
@@ -62,73 +63,79 @@ We follow the debezium output format: four columns as shown 
below
 - u: represent `update`; when `op` is `u`, both `before` and `after` don't be 
null;
 - d: represent `delete`; when `op` is `d`, `after` is always null;
 
-Note: the illustration here ignores all the Hudi metadata columns like 
`_hoodie_commit_time` in `before` and `after` columns.
+**Note**
 
-## Goals
+* In case of the same record having operations like insert -> delete -> 
insert, CDC data should be produced to reflect the exact behaviors.
+* The illustration above ignores all the Hudi metadata columns like 
`_hoodie_commit_time` in `before` and `after` columns.
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+## Design Goals
 
-## Implementation
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines

Review Comment:
   Let's also explicitly call out that:
   
   - For CDC-enabled Tables performance of non-CDC queries should not be 
affected



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -64,69 +65,72 @@ We follow the debezium output format: four columns as shown 
below
 
 Note: the illustration here ignores all the Hudi metadata columns like 
`_hoodie_commit_time` in `before` and `after` columns.
 
-## Goals
+## Design Goals
 
 1. Support row-level CDC records generation and persistence;
 2. Support both MOR and COW tables;
 3. Support all the write operations;
 4. Support Spark DataFrame/SQL/Streaming Query;
 
-## Implementation
+## Configurations
 
-### CDC Architecture
+| key                                                 | default  | description 
                                                                                
                                                     |
+|-----------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master 
switch of the CDC features. If `true`, writers and readers will respect CDC 
configurations and behave accordingly.                    |
+| hoodie.table.cdc.supplemental.logging               | `false`  | If `true`, 
persist the required information about the changed data, including `before`. If 
`false`, only `op` and record keys will be persisted. |
+| hoodie.table.cdc.supplemental.logging.include_after | `false`  | If `true`, 
persist `after` as well.                                                        
                                                      |
 
-![](arch.jpg)
+To perform CDC queries, users need to set `hoodie.table.cdc.enable=true` and 
`hoodie.datasource.query.type=incremental`.
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change 
any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+| key                                    | default    | description            
              |
+|----------------------------------------|------------|--------------------------------------|
+| hoodie.table.cdc.enabled               | `false`    | set to `true` for CDC 
queries        |
+| hoodie.datasource.query.type           | `snapshot` | set to `incremental` 
for CDC queries |
+| hoodie.datasource.read.start.timestamp | -          | requried.              
              |
+| hoodie.datasource.read.end.timestamp   | -          | optional.              
              |
 
-![](points.jpg)
+### Logical File Types
 
-### Config Definitions
+We define 4 logical file types for the CDC scenario.

Review Comment:
   Agree w/ @xushiyan proposal, let's simplify this -- having a table mapping 
an action on the Data table into the action on CDC log makes message much more 
clear.
   



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -148,20 +155,46 @@ hudi_cdc_table/
 
 Under a partition directory, the `.log` file with `CDCBlock` above will keep 
the changing data we have to materialize.
 
-There is an option to control what data is written to `CDCBlock`, that is 
`hoodie.table.cdc.supplemental.logging`. See the description of this config 
above.
+#### Persisting CDC in MOR: Write-on-indexing vs Write-on-compaction
+
+2 design choices on when to persist CDC in MOR tables:
+
+Write-on-indexing allows CDC info to be persisted at the earliest, however, in 
case of Flink writer or Bucket
+indexing, `op` (I/U/D) data is not available at indexing.
+
+Write-on-compaction can always persist CDC info and achieve standardization of 
implementation logic across engines,
+however, some delays are added to the CDC query results. Based on the business 
requirements, Log Compaction (RFC-48) or
+scheduling more frequent compaction can be used to minimize the latency.
 
-Spark DataSource example:
+The semantics we propose to establish are: when base files are written, the 
corresponding CDC data is also persisted.

Review Comment:
   I don't think i understand the proposal here: are we saying that we're not 
going to be producing the CDC records until compaction is performed? 
   
   I think this is a serious consistency gap: after Delta Commit records are 
already persisted and are visible to queries, hence someone can actually read 
the table and see these records persisted but when issuing CDC query they won't 
see these records.
   
   Can you please elaborate what is the challenge here? What's the issue w/ 
Flink writer? Why Bucket Index affects this?



##########
rfc/rfc-51/rfc-51.md:
##########
@@ -62,73 +63,79 @@ We follow the debezium output format: four columns as shown 
below
 - u: represent `update`; when `op` is `u`, both `before` and `after` don't be 
null;
 - d: represent `delete`; when `op` is `d`, `after` is always null;
 
-Note: the illustration here ignores all the Hudi metadata columns like 
`_hoodie_commit_time` in `before` and `after` columns.
+**Note**
 
-## Goals
+* In case of the same record having operations like insert -> delete -> 
insert, CDC data should be produced to reflect the exact behaviors.
+* The illustration above ignores all the Hudi metadata columns like 
`_hoodie_commit_time` in `before` and `after` columns.
 
-1. Support row-level CDC records generation and persistence;
-2. Support both MOR and COW tables;
-3. Support all the write operations;
-4. Support Spark DataFrame/SQL/Streaming Query;
+## Design Goals
 
-## Implementation
+1. Support row-level CDC records generation and persistence
+2. Support both MOR and COW tables
+3. Support all the write operations
+4. Support incremental queries in CDC format across supported engines
 
-### CDC Architecture
+## Configurations
 
-![](arch.jpg)
+| key                                                 | default  | description 
                                                                                
                                                                                
                                                                                
                                                         |
+|-----------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.table.cdc.enabled                            | `false`  | The master 
switch of the CDC features. If `true`, writers and readers will respect CDC 
configurations and behave accordingly.                                          
                                                                                
                                                              |
+| hoodie.table.cdc.supplemental.logging.mode          | `KEY_OP` | A mode to 
indicate the level of changed data being persisted. At the minimum level, 
`KEY_OP` indicates changed records' keys and operations to be persisted. 
`DATA_BEFORE`: persist records' before-images in addition to `KEY_OP`. 
`DATA_BEFORE_AFTER`: persist records' after-images in addition to 
`DATA_BEFORE`. |
 
-Note: Table operations like `Compact`, `Clean`, `Index` do not write/change 
any data. So we don't need to consider them in CDC scenario.
- 
-### Modifiying code paths
+To perform CDC queries, users need to set 
`hoodie.datasource.query.incremental.format=cdc` and 
`hoodie.datasource.query.type=incremental`.
 
-![](points.jpg)
+| key                                        | default        | description    
                                                                                
                                      |
+|--------------------------------------------|----------------|--------------------------------------------------------------------------------------------------------------------------------------|
+| hoodie.datasource.query.type               | `snapshot`     | set to 
`incremental` for incremental query.                                            
                                              |
+| hoodie.datasource.query.incremental.format | `latest_state` | `latest_state` 
(current incremental query behavior) returns the latest records' values. Set to 
`cdc` to return the full CDC results. |
+| hoodie.datasource.read.begin.instanttime   | -              | requried.      
                                                                                
                                      |
+| hoodie.datasource.read.end.instanttime     | -              | optional.      
                                                                                
                                      |
 
-### Config Definitions
+### Logical File Types
 
-Define a new config:
-
-| key | default | description |
-| --- | --- | --- |
-| hoodie.table.cdc.enabled | false | `true` represents the table to be used 
for CDC queries and will write cdc data if needed. |
-| hoodie.table.cdc.supplemental.logging | true | If true, persist all the 
required information about the change data, including 'before' and 'after'. 
Otherwise, just persist the 'op' and the record key. |
-
-Other existing config that can be reused in cdc mode is as following:
-Define another query mode named `cdc`, which is similar to `snapshpt`, 
`read_optimized` and `incremental`.
-When read in cdc mode, set `hoodie.datasource.query.type` to `cdc`.
-
-| key | default  | description |
-| --- |---| --- |
-| hoodie.datasource.query.type | snapshot | set to cdc, enable the cdc quey 
mode |
-| hoodie.datasource.read.start.timestamp | -        | requried. |
-| hoodie.datasource.read.end.timestamp | -        | optional. |
-
-
-### CDC File Types
-
-Here we define 5 cdc file types in CDC scenario.
+We define 4 logical file types for the CDC scenario.
 
 - CDC_LOG_File: a file consists of CDC Blocks with the changing data related 
to one commit.
-  - when `hoodie.table.cdc.supplemental.logging` is true, it keeps all the 
fields about the change data, including `op`, `ts_ms`, `before` and `after`. 
When query hudi table in cdc query mode, load this file and return directly.
-  - when `hoodie.table.cdc.supplemental.logging` is false, it just keeps the 
`op` and the key of the changing record. When query hudi table in cdc query 
mode, we need to load the previous version and the current one of the touched 
file slice to extract the other info like `before` and `after` on the fly.
+  - For COW tables, this file type refers to newly written log files alongside 
base files. The log files in this case only contain CDC info.
+  - For MOR tables, this file type refers to the typical log files in MOR 
tables. CDC info will be persisted as log blocks in the log files.
 - ADD_BASE_File: a normal base file for a specified instant and a specified 
file group. All the data in this file are new-incoming. For example, we first 
write data to a new file group. So we can load this file, treat each record in 
this as the value of `after`, and the value of `op` of each record is `i`.
 - REMOVE_BASE_FILE: a normal base file for a specified instant and a specified 
file group, but this file is empty. A file like this will be generated when we 
delete all the data in a file group. So we need to find the previous version of 
the file group, load it, treat each record in this as the value of `before`, 
and the value of `op` of each record is `d`.
 - MOR_LOG_FILE: a normal log file. For this type, we need to load the previous 
version of file slice, and merge each record in the log file with this data 
loaded separately to determine how the record has changed, and get the value of 
`before` and `after`.
 - REPLACED_FILE_GROUP: a file group that be replaced totally, like 
`DELETE_PARTITION` and `INSERT_OVERWRITE` operations. We load this file group, 
treat all the records as the value of `before`, and the value of `op` of each 
record is `d`.
 
 Note:
 
-- **Only `CDC_LOG_File` is a new file type and written out by CDC**. The 
`ADD_BASE_File`, `REMOVE_BASE_FILE`, `MOR_LOG_FILE` and `REPLACED_FILE_GROUP` 
are just representations of the existing data files in the CDC scenario. For 
some examples:
-  - `INSERT` operation will maybe create a list of new data files. These files 
will be treated as ADD_BASE_FILE;
-  - `DELETE_PARTITION` operation will replace a list of file slice. For each 
of these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
+**`CDC_LOG_File` is a new file type and written out for CDC**. 
`ADD_BASE_File`, `REMOVE_BASE_FILE`, and `REPLACED_FILE_GROUP` represent the 
existing data files in the CDC scenario. 
 
-### Write
+For examples:
+- `INSERT` operation will maybe create a list of new data files. These files 
will be treated as ADD_BASE_FILE;
+- `DELETE_PARTITION` operation will replace a list of file slice. For each of 
these, we get the cdc data in the `REPLACED_FILE_GROUP` way.
+
+## When `supplemental.logging.mode=KEY_OP`
+
+In this mode, we minimized the additional storage for CDC information.
 
-The idea is to **Write CDC files as little as possible, and reuse data files 
as much as possible**.
+- When write, only the change type `op`s and record keys are persisted.
+- When read, changed info will be inferred on-the-fly, which costs more 
computation power. As `op`s and record keys are
+  available, inference using current and previous committed data will be 
optimized by reducing IO cost of reading
+  previous committed data, i.e., only read changed records.
 
-Hudi writes data by `HoodieWriteHandle`.
-We notice that only `HoodieMergeHandle` and it's subclasses will receive both 
the old record and the new-coming record at the same time, merge and write.
-So we will add a `LogFormatWriter` in these classes. If there is CDC data need 
to be written out, then call this writer to write out a log file which consist 
of `CDCBlock`.
-The CDC log file will be placed in the same position as the base files and 
other log files, so that the clean service can clean up them without extra 
work. The file structure is like:
+The detailed logical flows for write and read scenarios are the same 
regardless of `logging.mode`, which will be
+illustrated in the section below.
+
+## When `supplemental.logging.mode=DATA_BEFORE` or `DATA_BEFORE_AFTER`
+
+Overall logic flows are illustrated below.
+
+![](logic-flows.jpg)
+
+### Write
+
+Hudi writes data by `HoodieWriteHandle`. We notice that only 
`HoodieMergeHandle` and its subclasses will receive both
+the old record and the new-coming record at the same time, merge and write. So 
we will add a `LogFormatWriter` in these
+classes. If there is CDC data need to be written out, then call this writer to 
write out a log file which consist
+of `CDCBlock`. The CDC log file will be placed in the same position as the 
base files and other log files, so that the
+clean service can clean up them without extra work. The file structure is like:

Review Comment:
   Understood. I appreciate the intent, but i think we should prioritize 
querying performance over how much we need to tweak cleaner to support it. 
   
   Problem w/ existing approach is that we can't cleanly separate out CDC/Data 
log-files w/o having separate naming schemes for them -- even though, we might 
be creating a new CDC log file for every write we do, whenever we do Delta 
Commit (on HDFS) we will be appending the log-blocks to the latest log-file 
present w/o checking whether it's CDC or Data log-file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to