xushiyan opened a new issue, #611:
URL: https://github.com/apache/hudi-rs/issues/611
## Summary
`crates/core/src/file_group/reader.rs::FileGroupReader::read_file_slice_from_paths`
hard-codes one merge algorithm — the sort-and-dedup implementation in
`crates/core/src/merge/record_merger.rs::RecordMerger`. This proposal
introduces
a small trait abstraction so the file-group reader is agnostic of *how*
merging
is performed, and additional merger implementations can be added without
touching the reader.
The motivating cases for pluggability:
- File groups with skewed key distributions where a sort-based dedup pays an
O(n log n) cost that a key-indexed approach would avoid.
- Key-filtered scans (e.g., `IN(record_key, [...])`) where only a handful of
keys survive — a hash-keyed merger can short-circuit.
- Future support for externally supplied merge strategies via config.
## Goals
- **Minimal additive change.** ~60 LOC on `crates/core`, no modification to
the
existing merge algorithm.
- **`read_file_slice` references the trait, not a concrete type.**
- **Backward compatible.** Today's algorithm becomes the default impl; no
user-visible change.
- **Config-selectable.** Future impls land without API churn.
## Non-goals
- Streaming merge output. Trait returns a single `RecordBatch`, matching
today's contract.
- Engine-aware mergers (DataFusion `ExecutionPlan` fusion).
- Async merge boundary. Merging is CPU-bound over already-materialized
batches.
## Proposed trait
```rust
// crates/core/src/merge/mod.rs
pub trait RecordMerger: Send + Sync + std::fmt::Debug {
/// Merge data + delete inputs into a single deduplicated batch.
/// Caller passes the union of base-file and log-file batches —
/// the merger picks the latest version per record key and
/// applies deletes.
fn merge(&self, inputs: RecordBatches) -> Result<RecordBatch>;
/// Output schema. Lets callers pre-allocate or project
/// without inspecting the result.
fn output_schema(&self) -> &SchemaRef;
}
```
Three intentional choices:
1. **`&self`.** Mergers are configured at construction; per-call state lives
in
the impl. Callers can hold `Arc<dyn RecordMerger>` and reuse across slices
without locking.
2. **No `async`.** I/O happens upstream in scanning. If a future impl needs
to
spill to disk, it wraps with `spawn_blocking` at the call site.
3. **Inputs are base/log-agnostic.** `RecordBatches` already carries
`data_batches` plus `(delete_batch, instant_time)` pairs. Whether a data
batch came from the base file or a log block is a scan-time detail and not
exposed at the merge boundary.
## Default impl — `RecordBatchMerger`
Today's `RecordMerger` struct is renamed to `RecordBatchMerger` and gains the
trait impl as a one-line forward:
```rust
// crates/core/src/merge/record_batch_merger.rs
pub struct RecordBatchMerger {
pub schema: SchemaRef,
pub hudi_configs: Arc<HudiConfigs>,
}
// existing algorithm unchanged — sort by (key, ordering, seqno) DESC,
// walk sorted indices, keep first-seen per key, subtract deletes.
impl RecordMerger for RecordBatchMerger {
fn merge(&self, inputs: RecordBatches) -> Result<RecordBatch> {
self.merge_record_batches(inputs)
}
fn output_schema(&self) -> &SchemaRef { &self.schema }
}
```
## Factory and config
```rust
// crates/core/src/merge/mod.rs
pub fn create_record_merger(
schema: SchemaRef,
hudi_configs: Arc<HudiConfigs>,
) -> Result<Arc<dyn RecordMerger>> {
let impl_name: String =
hudi_configs.get_or_default(RecordMergerImpl).into();
match impl_name.as_str() {
"record_batch" => Ok(Arc::new(RecordBatchMerger::new(schema,
hudi_configs))),
other => Err(CoreError::Config(format!(
"unknown record_merger_impl: {other}"
))),
}
}
```
```rust
// crates/core/src/config/read.rs
pub enum HudiReadConfig {
// ... existing variants ...
RecordMergerImpl, // default: "record_batch"
}
```
`RecordMerger::validate_configs` (today's static method on the struct)
becomes
a free function `merge::validate_configs(&HudiConfigs)` called at
table-builder
time. Both current and future impls share validation.
## Call-site change
```rust
// crates/core/src/file_group/reader.rs, in read_file_slice_from_paths
// Before
let merger = RecordMerger::new(schema.clone(), self.hudi_configs.clone());
let merged = merger.merge_record_batches(all_batches)?;
// After
let merger = create_record_merger(schema.clone(),
self.hudi_configs.clone())?;
let merged = merger.merge(all_batches)?;
```
Three lines changed. `FileGroupReader` no longer mentions the concrete merger
type — only the trait.
## Testing
Two layers:
- **Trait-level conformance suite** at `crates/core/src/merge/tests.rs`:
empty input, append-only, overwrite-with-latest, deletes-only, mixed
insert/update/delete, null ordering values. Runs through `&dyn
RecordMerger`.
Any future impl must pass.
- **Impl-specific tests** for `RecordBatchMerger` stay as they are today
(sort/dedup edge cases, null ordering handling).
## Estimated change size
~60 LOC additive on `crates/core`. Zero changes to the merge algorithm.
Backward compatible (default selects today's behavior).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]