osipovartem commented on issue #17267:
URL: https://github.com/apache/datafusion/issues/17267#issuecomment-3467735359

   Here are several ideas and directions for improving the HashJoin 
implementation during DF code research
   
   ### Hybrid spilling with memory awareness
     - We could extend SpillManager with an automatic hybrid spilling 
mechanism, something like:
   ```rust
   fn spill_batch_auto(&self, batch: &RecordBatch, request_msg: &str) -> 
Result<SpillLocation> { ... }
   ```
   Where
   ```rust
   pub enum SpillLocation {
       Memory(Arc<InMemorySpillBuffer>),
       Disk(RefCountedTempFile),
   }
   
   pub struct InMemorySpillBuffer {
       batches: Vec<RecordBatch>,
       total_bytes: usize,
   }
   ```
   This would allow DataFusion to decide dynamically whether to keep a batch in 
memory or spill it to disk based on current memory pool availability.
   
   ### Index-based left-side storage
     - Since the left (build-side) RecordBatch is only required later during 
build_batch_from_indices,
   we could store only references (metadata) to the actual spilled or in-memory 
chunks:
   ```rust
   pub struct LeftChunkIndex {
       /// Spill file or memory handle
       pub location: SpillLocation,
       /// Global start offset of this chunk (in build-side concatenation)
       pub start_row: usize,
       /// Number of rows in this chunk
       pub num_rows: usize,
   }
   ```
   - This allows lazy reloading of only the necessary build-side chunks during 
probing.
   When **lookup_join_hashmap** func filters out 0..N rows, a significant 
portion of data may never need to be reloaded from disk or memory at all.
   
   ### Avoid duplicate evaluation during collect_left_input
   - Currently we evaluate an expression against a RecordBatch twice (not sure 
if it is havy operation)
   ```rust
   let keys_values = on
       .iter()
       .map(|c| c.evaluate(batch)?.into_array(batch.num_rows()))
       .collect::<Result<Vec<_>>>()?;
   ```
   - once inside update_hash, and again later inside collect_left_input.
   It would be more efficient to return these values directly from update_hash 
so they can be reused, avoiding redundant array computation.
   
   ### Reduced in-memory footprint per partition
   
   - By combining the above ideas, we could avoid keeping the entire left-side 
RecordBatch in memory for each partition.
   Instead, only the hash map and chunk indices would remain resident, while 
the actual build-side data could be reloaded on demand (from either memory 
buffers or spill files).
   - For TPCH Q18 with scale factor 10 we store (lineitem table ~ 66KK records 
, 32 partitions). Hashmap looks over-allocated 
   ```JoinLeftData for partition 31 with total 182.74 MB: 
    • Hashmap 68.00 MB
    • Batch: 114.50 MB, count 2069975
    • Bitmap: 252.68 KB
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to