xushiyan opened a new issue, #611:
URL: https://github.com/apache/hudi-rs/issues/611

   ## Summary
   
   
`crates/core/src/file_group/reader.rs::FileGroupReader::read_file_slice_from_paths`
   hard-codes one merge algorithm — the sort-and-dedup implementation in
   `crates/core/src/merge/record_merger.rs::RecordMerger`. This proposal 
introduces
   a small trait abstraction so the file-group reader is agnostic of *how* 
merging
   is performed, and additional merger implementations can be added without
   touching the reader.
   
   The motivating cases for pluggability:
   
   - File groups with skewed key distributions where a sort-based dedup pays an
     O(n log n) cost that a key-indexed approach would avoid.
   - Key-filtered scans (e.g., `IN(record_key, [...])`) where only a handful of
     keys survive — a hash-keyed merger can short-circuit.
   - Future support for externally supplied merge strategies via config.
   
   ## Goals
   
   - **Minimal additive change.** ~60 LOC on `crates/core`, no modification to 
the
     existing merge algorithm.
   - **`read_file_slice` references the trait, not a concrete type.**
   - **Backward compatible.** Today's algorithm becomes the default impl; no
     user-visible change.
   - **Config-selectable.** Future impls land without API churn.
   
   ## Non-goals
   
   - Streaming merge output. Trait returns a single `RecordBatch`, matching
     today's contract.
   - Engine-aware mergers (DataFusion `ExecutionPlan` fusion).
   - Async merge boundary. Merging is CPU-bound over already-materialized 
batches.
   
   ## Proposed trait
   
   ```rust
   // crates/core/src/merge/mod.rs
   
   pub trait RecordMerger: Send + Sync + std::fmt::Debug {
       /// Merge data + delete inputs into a single deduplicated batch.
       /// Caller passes the union of base-file and log-file batches —
       /// the merger picks the latest version per record key and
       /// applies deletes.
       fn merge(&self, inputs: RecordBatches) -> Result<RecordBatch>;
   
       /// Output schema. Lets callers pre-allocate or project
       /// without inspecting the result.
       fn output_schema(&self) -> &SchemaRef;
   }
   ```
   
   Three intentional choices:
   
   1. **`&self`.** Mergers are configured at construction; per-call state lives 
in
      the impl. Callers can hold `Arc<dyn RecordMerger>` and reuse across slices
      without locking.
   2. **No `async`.** I/O happens upstream in scanning. If a future impl needs 
to
      spill to disk, it wraps with `spawn_blocking` at the call site.
   3. **Inputs are base/log-agnostic.** `RecordBatches` already carries
      `data_batches` plus `(delete_batch, instant_time)` pairs. Whether a data
      batch came from the base file or a log block is a scan-time detail and not
      exposed at the merge boundary.
   
   ## Default impl — `RecordBatchMerger`
   
   Today's `RecordMerger` struct is renamed to `RecordBatchMerger` and gains the
   trait impl as a one-line forward:
   
   ```rust
   // crates/core/src/merge/record_batch_merger.rs
   
   pub struct RecordBatchMerger {
       pub schema: SchemaRef,
       pub hudi_configs: Arc<HudiConfigs>,
   }
   
   // existing algorithm unchanged — sort by (key, ordering, seqno) DESC,
   // walk sorted indices, keep first-seen per key, subtract deletes.
   
   impl RecordMerger for RecordBatchMerger {
       fn merge(&self, inputs: RecordBatches) -> Result<RecordBatch> {
           self.merge_record_batches(inputs)
       }
       fn output_schema(&self) -> &SchemaRef { &self.schema }
   }
   ```
   
   ## Factory and config
   
   ```rust
   // crates/core/src/merge/mod.rs
   
   pub fn create_record_merger(
       schema: SchemaRef,
       hudi_configs: Arc<HudiConfigs>,
   ) -> Result<Arc<dyn RecordMerger>> {
       let impl_name: String = 
hudi_configs.get_or_default(RecordMergerImpl).into();
       match impl_name.as_str() {
           "record_batch" => Ok(Arc::new(RecordBatchMerger::new(schema, 
hudi_configs))),
           other => Err(CoreError::Config(format!(
               "unknown record_merger_impl: {other}"
           ))),
       }
   }
   ```
   
   ```rust
   // crates/core/src/config/read.rs
   pub enum HudiReadConfig {
       // ... existing variants ...
       RecordMergerImpl,   // default: "record_batch"
   }
   ```
   
   `RecordMerger::validate_configs` (today's static method on the struct) 
becomes
   a free function `merge::validate_configs(&HudiConfigs)` called at 
table-builder
   time. Both current and future impls share validation.
   
   ## Call-site change
   
   ```rust
   // crates/core/src/file_group/reader.rs, in read_file_slice_from_paths
   
   // Before
   let merger = RecordMerger::new(schema.clone(), self.hudi_configs.clone());
   let merged = merger.merge_record_batches(all_batches)?;
   
   // After
   let merger = create_record_merger(schema.clone(), 
self.hudi_configs.clone())?;
   let merged = merger.merge(all_batches)?;
   ```
   
   Three lines changed. `FileGroupReader` no longer mentions the concrete merger
   type — only the trait.
   
   ## Testing
   
   Two layers:
   
   - **Trait-level conformance suite** at `crates/core/src/merge/tests.rs`:
     empty input, append-only, overwrite-with-latest, deletes-only, mixed
     insert/update/delete, null ordering values. Runs through `&dyn 
RecordMerger`.
     Any future impl must pass.
   - **Impl-specific tests** for `RecordBatchMerger` stay as they are today
     (sort/dedup edge cases, null ordering handling).
   
   ## Estimated change size
   
   ~60 LOC additive on `crates/core`. Zero changes to the merge algorithm.
   Backward compatible (default selects today's behavior).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to