danny0405 commented on code in PR #17827: URL: https://github.com/apache/hudi/pull/17827#discussion_r2757311739
########## rfc/rfc-103/rfc-103.md: ########## @@ -0,0 +1,311 @@ + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +# RFC-103: Hudi LSM tree layout + +## Proposers + +- @zhangyue19921010 +- @xushiyan + +## Approvers + +- @danny0405 +- @vinothchandar + +## Status + +Main issue: https://github.com/apache/hudi/issues/14310 + +## Background + +LSM Trees (Log-Structured Merge-Trees) are data structures optimized for write-intensive workloads and are widely used in modern database systems such as Paimon, LevelDB, RocksDB, Cassandra, etc. By leveraging sequential writes and a tiered merge (compaction) mechanism, they offer clear advantages in: + +- **High write throughput** +- **Efficient, tiered compaction** +- **Optimized read paths** + +## Goal + +This RFC proposes applying LSM-inspired principles (**sequential writes + tiered merges**) to improve the data organization protocol for **Hudi MOR tables**, and replacing **Avro** with **Parquet** as the on-disk format for individual log files, to achieve: + +1. Improve the **read performance**, **write performance**, and **overall stability** of MOR tables—especially for **wide tables**—in scenarios such as: + - predicate pushdown + - point lookups + - column/data pruning +2. Improve the **performance** and **stability** of MOR **compaction** +3. Increase the **compression ratio** of log files + +## Design Overview + + + +The core idea is to treat, **within each file group**: + +- **Log files** as **Level-0 (L0)** of an LSM tree +- The only **Base file** as **Level-1 (L1)** + +The file naming formats for base and log files should retain unchanged. + +To realize this layout: + +- Records inside **log and base files must be sorted by record key(s)** (**Core Feature 1**) +- Records should be deduplicated before writing to any log file, i.e., no dups within a log file. Duplicates can be seen across log files. +- Existing services should implement **sorted merge-based compaction**: + - **log-compaction** handles **L0 compaction** + - **compaction table service** handles **L0 → L1 compaction** + - both use a **sorted merge algorithm** (**Core Feature 2**) + +## Considerations + +### Table configs + +The layout should be enforced by a table property, for e.g. `hoodie.table.storage.layout=default|lsm_tree` (default value: `default`, which is current base/log file organization): + +- The config is not allowed to be set to `lsm_tree` for an existing table +- The config is allowed to be set to `default` for an existing table +- The config is set to `default` by default + +The layout is only applicable to MOR table, and not applicable to COW. When setting the layout config for a COW table, the persisted config for the layout will always be false. + +When an LSM-tree layout enabled MOR table is migrated to COW, the layout config will automatically set to `default`. + +### Engine-agnostic + +The layout should be engine-agnostic. Writer engines can make use of shared implementation and add specific logic or design to comform to the layout. + +For example, Flink writers use buffer sort, the Flink sink must flush sorted records into a single file to guarantee file-level ordering. + +### Write operations + +Write operations should remain semantically unchanged when the layout is enabled. + +In MOR tables, when **small file handling** occurs, inserts may be bin-packed into file slices without log files, creating a new base file, the **sorted write** needs to be applied. + +The most performant writer setup for LSM tree layout will be bucket index + bulk insert, which best utilizes sorted merging. The downside would be that small files may proliferate, which can be mitigated by doing log compaction. + +### Indexes + +Writer indexes should still function as is under this layout. Same for reader indexes. + +### Clustering + +Clustering will be restricted to **record key sorting** only. + +For **MOR + bucket index** setup, clustering is typically not needed. + +## Core Feature 1: Sorted Write + +All writes are sorted. That is, within any written file (**base or log**), records are fully sorted by record key(s). + +All write operations and writer index types should be supported, as the layout is only about keeping records sorted in data files, which is orthogonal to the choice of write operation and index type. + +### Example: Flink Streaming Write Pipeline + + + +The write pipeline mainly consists of four core stages: + +- **Repartitioning** +- **Sorting** +- **Deduplication** +- **I/O** + +Optimizations: + +1. **Asynchronous processing architecture** + Introduce a **Disruptor ring buffer** within the sink operator to decouple production and consumption, significantly improving throughput and handling cases where the producer outpaces the consumer. + +2. **Efficient memory management** + Integrate Flink’s built-in **MemorySegmentPool** with **BinaryInMemorySortBuffer** to enable fine-grained memory control and efficient sorting, greatly reducing GC pressure and sorting overhead. + +## Core Feature 2: Sorted Merge Read / Compaction + + + +During read and compaction, merging is performed using a **sorted merge algorithm** (e.g., **loser-tree** for k-way merge). + +- Resulting **log files** contain fully sorted records +- Resulting **base files** contain fully sorted records +- File group reads reuse the same sorted merge logic, with **predicate pruning** applied when present + +### Implementation tasks + +- Implement sorted merge: **Loser tree** for **k-way merge** +- Reuse existing **Record Merger APIs** +- Update the following components to use sorted merge: + - Log compaction + - Compaction runner (L0 → L1) + - File group reader + +--- + +## Log format v2: native log file format + +### Current log format (v1) + +Current log format is organized as below (ref: [tech spec v8](https://hudi.apache.org/learn/tech-specs-1point0#log-format)): + +```text +#HUDI# (magic, 6 bytes) +Block Size (8 bytes) +Log Format Version (4 bytes) +Block Type (4 bytes) +Header Metadata (variable) +Content Length (8 bytes) +Content (variable) - data block, embedded Avro/Parquet/HFile binary data +Footer Metadata (variable) +Reverse Pointer (8 bytes) +``` + +These fields are encoded into a custom binary format and stored in log files with extension like `.log.<version>_<write_token>`. + +### Proposed log format v2 + +The proposed new log format leverages native file format's metadata layer to capture the metadata fields defined by Hudi log format, while keeping the content field (data block). Take parquet for example: + +```text +Row group 1 (data) +Row group 2 (data) +... +Footer + - Parquet schema + - Row group metadata + - key-value metadata <-- Hudi log format metadata goes in here +``` + +All Hudi log format metadata can be stored as key value pairs + +| Hudi log format metadata | Parquet footer key | +|:---------------------------------------------|:------------------------------------------| +| log format version | `hudi.log.format_version` | +| block type | `hudi.log.block_type` | +| `INSTANT_TIME` | `hudi.log.instant_time` | +| `TARGET_INSTANT_TIME` | `hudi.log.target_instant_time` | +| `SCHEMA` | NA (use Parquet's native schema) | +| `COMMAND_BLOCK_TYPE` | `hudi.log.command_block_type` | +| `COMPACTED_BLOCK_TIMES` | `hudi.log.compacted_block_times` | +| `RECORD_POSITIONS` | `hudi.log.record_positions` | +| `BLOCK_IDENTIFIER` | `hudi.log.block_identifier` | +| `IS_PARTIAL` | `hudi.log.is_partial` | +| `BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS` | `hudi.log.base_file_instant_of_positions` | + +### Why native file format over embedded Parquet log blocks? + +An alternative approach is to keep the V1 log format structure and embed Parquet-encoded data as block content. However, the embedding approach has drawbacks compared to using native Parquet files: + +| Aspect | Embedded Parquet (V1) | Native Parquet (V2) | +|---------------------------|----------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------| +| **Parquet optimizations** | Vectorized reads, predicate pushdown, column pruning available after block location | Available directly at file read | +| **Write model** | Designed for append (for HDFS, not for object storage) | Write-once model (aligns with object storage) | +| **Reading overhead** | Must read log block header first, then use InLineFileSystem abstraction with offset translation to access embedded content | Direct Parquet file read with metadata immediately available | +| **Tool compatibility** | Requires Hudi-specific readers | Any Parquet-compatible tool can read | +| **Compression** | Block-level only | Parquet's columnar encoding | +| **Schema storage** | Duplicated in header and content | Consolidated in Parquet footer | + +Using native log file format can also be extended to other file format, like [Lance](https://lance.org/format/file/) for example. The Hudi log format metadata can be stored in Lance file's [global buffer](https://lance.org/format/file/#external-buffers) to faciliate log file operations. + +### Block type handling + +**Data blocks**: The entire file is a native Parquet file with `hudi.log.block_type` = `parquet_data`. Schema is stored natively in Parquet footer (no duplication). + +**Delete blocks**: Store delete records as Parquet with a delete schema containing record key, partition path, and ordering value. Set `hudi.log.block_type` = `delete`. + +**Command blocks**: Create Parquet file with zero row groups (metadata only). Set `hudi.log.block_type` = `command` along with `hudi.log.command_block_type` and `hudi.log.target_instant_time`. + +**CDC blocks**: Same structure as data blocks with CDC schema. Set `hudi.log.block_type` = `cdc`. + +### Compatibility + +1. **File naming**: Remain consistent with existing log file naming to ensure compatibility +2. **Writer changes**: Block append operations are no longer supported. During writes, input data is written directly to new Parquet files +3. **Reader changes**: Detect format via magic bytes (Parquet magic vs Hudi magic). + +| Scenario | Behavior | +|:-------------------------------------|:-------------------------------------------| +| V2 reader reading V1 files | Supported (detect by magic bytes) | +| V1 reader reading V2 files | Not supported (expected for older readers) | +| Mixed V1/V2 files in same file slice | Supported (reader detects per-file) | + +### How this feature relates to LSM tree layout + +Using the native log file format does not depend on using LSM tree layout. Tables will unlock the benefits as described in this section regardless of using the default or LSM tree layout. + +## Hybrid Compaction Strategy + +Hudi currently supports two compaction types: Review Comment: big +1 to this, looks very promising. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
