xushiyan commented on issue #611:
URL: https://github.com/apache/hudi-rs/issues/611#issuecomment-4456988117

   ## Note: adopting a hash-based merger as a second impl
   
   A separate line of work is exploring a per-key hash-buffer merger (analogous
   to a `Map<RecordKey, BufferedRecord>` model). This comment captures how that
   impl plugs into the trait above.
   
   ### Adapter
   
   ```rust
   // crates/core/src/merge/hash_based_merger.rs
   
   pub struct HashBasedMerger {
       schema: SchemaRef,
       hudi_configs: Arc<HudiConfigs>,
   }
   
   impl RecordMerger for HashBasedMerger {
       fn merge(&self, inputs: RecordBatches) -> Result<RecordBatch> {
           let mut buffer = KeyBasedFileGroupRecordBuffer::new(
               &self.schema,
               &self.hudi_configs,
           )?;
           let record_ctx = RecordContext::from_configs(&self.hudi_configs)?;
   
           for (delete_batch, instant) in inputs.delete_batches {
               buffer.process_delete_batch(&record_ctx, delete_batch, 
&instant)?;
           }
           for data_batch in inputs.data_batches {
               for buffered in 
record_ctx.batch_to_buffered_records(data_batch)? {
                   let key = buffered.record_key.clone();
                   buffer.process_next_data_record(buffered, key)?;
               }
           }
           buffer.drain_to_record_batch(&self.schema)
       }
   
       fn output_schema(&self) -> &SchemaRef { &self.schema }
   }
   ```
   
   Factory grows one arm:
   
   ```rust
   "hash_based" => Ok(Arc::new(HashBasedMerger::new(schema, hudi_configs))),
   ```
   
   ### Extractions required for the adapter
   
   Three narrow public-surface additions to `KeyBasedFileGroupRecordBuffer`.
   None rewrites internal logic; each lifts an existing internal path to a
   callable boundary.
   
   1. **`new(schema, hudi_configs) -> Result<Self>`** — a thin constructor that
      skips the full reader-context chain. The buffer only needs schema + 
configs.
   2. **`process_delete_batch(record_ctx, batch, instant_time)`** — factor out
      the delete-row → buffer update loop so it accepts a `RecordBatch` instead
      of being inlined behind a `LogBlock` envelope in the log scanner.
   3. **`drain_to_record_batch(schema) -> Result<RecordBatch>`** — lift the
      HashMap-to-batch materialization out of the reader pipeline.
   
   ### One semantic concession
   
   Designs that natively distinguish base-file records from log-file records
   (two-phase merge: log-vs-log during scan, base-vs-log at iteration) flatten
   under this trait — `inputs.data_batches` is a mixed bag with no source tag.
   
   The adapter resolves this by deduping on ordering value alone: any record
   with a higher ordering than the buffer entry wins. Same final result; no
   source-tag distinction needed at the trait boundary.
   
   ### Estimated change size for the adapter
   
   Adapter struct: ~100 LOC. Buffer surface extractions: ~50–100 LOC of code
   moves. No algorithm changes.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to