danny0405 commented on code in PR #17827:
URL: https://github.com/apache/hudi/pull/17827#discussion_r2688910870


##########
rfc/rfc-103/rfc-103.md:
##########
@@ -0,0 +1,246 @@
+   <!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-103: Hudi LSM tree layout
+
+## Proposers
+
+- @zhangyue19921010
+- @xushiyan
+
+## Approvers
+- @danny0405
+- @vinothchandar
+
+## Status
+
+Main issue: https://github.com/apache/hudi/issues/14310
+
+## Background
+
+LSM Trees (Log-Structured Merge-Trees) are data structures optimized for 
write-intensive workloads and are widely used in modern database systems such 
as Paimon, LevelDB, RocksDB, Cassandra, etc. By leveraging sequential writes 
and a tiered merge (compaction) mechanism, they offer clear advantages in:
+
+- **High write throughput**
+- **Efficient, tiered compaction**
+- **Optimized read paths**
+
+## Goal
+
+This RFC proposes applying LSM-inspired principles (**sequential writes + 
tiered merges**) to improve the data organization protocol for **Hudi MOR 
tables**, and replacing **Avro** with **Parquet** as the on-disk format for 
individual log files, to achieve:
+
+1. Improve the **read performance**, **write performance**, and **overall 
stability** of MOR tables—especially for **wide tables**—in scenarios such as:
+   - predicate pushdown
+   - point lookups
+   - column/data pruning
+2. Improve the **performance** and **stability** of MOR **compaction**
+3. Increase the **compression ratio** of log files
+
+## Design Overview
+
+![01-lsm-tree-layout-overview](01-lsm-tree-layout-overview.png)
+
+The core idea is to treat, **within each file group**:
+
+- **Log files** as **Level-0 (L0)** of an LSM tree
+- The only **Base file** as **Level-1 (L1)**
+
+The file naming formats for base and log files should retain unchanged.
+
+To realize this layout:
+
+- Records inside **log and base files must be sorted** (**Core Feature 1**)
+- Records should be deduplicated before writing to any log file, i.e., no dups 
within a log file. Duplicates can be seen across log files.
+- Existing services should implement **sorted merge-based compaction**:
+  - **log-compaction** handles **L0 compaction**
+  - **compaction table service** handles **L0 → L1 compaction**
+  - both use a **sorted merge algorithm** (**Core Feature 2**)
+
+## Considerations
+
+### Table configs
+
+The layout should be enforced by a table property, for e.g. 
`hoodie.table.storage.layout=base_log|lsm_tree` (default `base_log`, which is 
current base/log file organization):
+
+- The config is not allowed to be set to `lsm_tree` for an existing table
+- The config is allowed to be set to `base_log` for an existing table
+
+The layout is only applicable to MOR table, and not applicable to COW. When 
setting the layout config for a COW table, the persisted config for the layout 
will always be false.
+
+When an LSM-tree layout enabled MOR table is migrated to COW, the layout 
config will automatically set to `false`.
+
+### Engine-agnostic
+
+The layout should be engine-agnostic. Writer engines can make use of shared 
implementation and add specific logic or design to comform to the layout.
+
+For example, Flink writers use buffer sort, the Flink sink must flush sorted 
records into a single file to guarantee file-level ordering.
+
+### Write operations
+
+Write operations should remain semantically unchanged when the layout is 
enabled.
+
+In MOR tables, when **small file handling** occurs, inserts may be bin-packed 
into file slices without log files, creating a new base file, the **sorted 
write** needs to be applied.
+
+The most performant writer setup for LSM tree layout will be bucket index + 
bulk insert, which best utilizes sorted merging. The downside would be that 
small files may proliferate, which can be mitigated by doing log compaction.
+
+### Indexes
+
+Writer indexes should still function as is under this layout. Same for reader 
indexes.
+
+### Clustering
+
+Clustering will be restricted to **record key sorting** only. 
+
+For **MOR + bucket index** setup, clustering is typically not needed.
+
+## Core Feature 1: Sorted Write
+
+All writes are sorted. That is, within any written file (**base or log**), 
records are fully sorted by key(s).
+
+### Initial support (v1)
+
+- `bulk_insert`
+- `insert_overwrite`
+- with **bucket index**
+
+### Future support
+
+- `insert`, `upsert`
+- other index types
+
+### Example: Flink Streaming Write Pipeline
+
+![02-write-with-disruptor-buffer-sort](02-write-with-disruptor-buffer-sort.png)
+
+The write pipeline mainly consists of four core stages:
+
+- **Repartitioning**
+- **Sorting**
+- **Deduplication**
+- **I/O**
+
+Optimizations:
+
+1. **Asynchronous processing architecture**  
+   Introduce a **Disruptor ring buffer** within the sink operator to decouple 
production and consumption, significantly improving throughput and handling 
cases where the producer outpaces the consumer.
+
+2. **Efficient memory management**  
+   Integrate Flink’s built-in **MemorySegmentPool** with 
**BinaryInMemorySortBuffer** to enable fine-grained memory control and 
efficient sorting, greatly reducing GC pressure and sorting overhead.
+
+## Core Feature 2: Sorted Merge Read / Compaction
+
+![03-k-way-merging](03-k-way-merging.png)
+
+During read and compaction, merging is performed using a **sorted merge 
algorithm** (e.g., **loser-tree** for k-way merge).
+
+- Resulting **log files** contain fully sorted records
+- Resulting **base files** contain fully sorted records
+- File group reads reuse the same sorted merge logic, with **predicate 
pruning** applied when present
+
+### Implementation tasks
+
+- Implement sorted merge: **Loser tree** for **k-way merge**
+- Reuse existing **Record Merger APIs**
+- Update the following components to use sorted merge:
+  - Log compaction
+  - Compaction runner (L0 → L1)
+  - File group reader
+
+---
+
+## Additional (Orthogonal) Features
+
+These features amplify the benefits of the LSM layout but are not strictly 
required by the layout itself but can optimize the performance and user 
experience of LSM.
+
+### 1) Parquet as Log File Format
+
+**Benefits**
+
+- Vectorized processing
+- Better compression than Avro
+- Support pruning during reads
+
+Switching log file format from Avro to Parquet requires the following changes:
+
+0. Parquet log file naming format should remain consistent with existing Avro 
logs to ensure compatibility with existing MOR tables
+1. **Writer changes**: Block append operations are no longer supported. During 
writes, input data is sorted and deduplicated, then written directly to new 
Parquet files using a Create handler:
+   - For **Spark**: reuse the bulk insert write logic
+   - For **Flink**: refactor the upsert write logic. Data preparation, 
metadata field addition, and sorting logic can be reused, but the final write 
should use the Parquet Create Handler to write new Parquet log files
+2. **Reader changes**: When reading Parquet log files, skip the logic for 
handling delete blocks and damaged blocks. Read data directly using the Parquet 
Log Reader, enabling optimizations such as vectorized reads and column pruning
+3. **Markers**: Implement a new MOR marker write mechanism. Create markers are 
written during writes, similar to COW create markers
+4. **Rollback**: Handle both Marker-Based Rollback and Listing-Based Rollback 
scenarios:
+   - For MOR Parquet logs, damaged files are deleted directly (similar to COW)
+5. **Cleaning**: MOR Parquet log file cleaning directly deletes the 
corresponding Parquet log files (similar to COW Parquet)
+
+
+**Behavior changes**
+
+- MOR **rollback** deletes Parquet log files directly, instead of appending a 
delete block.
+- For **cleaning**, Parquet log files are deleted.

Review Comment:
   can we clarify whether we have multiple files in L1 layer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to