rluvaton commented on code in PR #17163:
URL: https://github.com/apache/datafusion/pull/17163#discussion_r2288436676


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -299,13 +307,63 @@ impl ExternalSorter {
             batch_size,
             sort_spill_reservation_bytes,
             sort_in_place_threshold_bytes,
+            cursor_batch_ratio,
         })
     }
 
+    /// Calculates the ratio of memory used by the sort cursor to the original 
`RecordBatch`.
+    /// Returns the ratio `(cursor_size / batch_size) + 1.0`, representing the 
expected memory multiplier
+    /// when allocating space for both the original batch and its associated 
cursor.
+    ///
+    /// Mirrors the cursor selection logic in `StreamingMerge::build`
+    /// Performs the same conversion for ratio estimation, but discards the 
result.
+    fn calculate_ratio(&self, batch: &RecordBatch) -> Result<f64> {
+        let batch_size = get_record_batch_memory_size(batch);
+        if self.expr.len() == 1 {
+            let value = self.expr.first().expr.evaluate(batch)?;
+            let array = value.into_array(batch.num_rows())?;
+            let size_in_mem = array.get_buffer_memory_size();
+
+            Ok(size_in_mem as f64 / batch_size as f64 + 1.0)
+        } else {
+            let sort_fields = self
+                .expr
+                .iter()
+                .map(|sort_expr| {
+                    let data_type = sort_expr.expr.data_type(&self.schema)?;
+                    Ok(SortField::new_with_options(data_type, 
sort_expr.options))
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            let converter = RowConverter::new(sort_fields)?;
+            let mut rows = converter.empty_rows(0, 0);
+
+            let cols = self
+                .expr
+                .iter()
+                .map(|sort_expr| {
+                    
sort_expr.expr.evaluate(batch)?.into_array(batch.num_rows())
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            converter.append(&mut rows, &cols)?;
+
+            let rows = Arc::new(rows);
+
+            Ok(rows.size() as f64 / batch_size as f64 + 1.0)
+        }
+    }
+
     /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
     ///
     /// Updates memory usage metrics, and possibly triggers spilling to disk
     async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
+        // Only for first time
+        if self.cursor_batch_ratio.is_none() {
+            let ratio = self.calculate_ratio(&input)?;
+            self.cursor_batch_ratio = Some(ratio);
+        }

Review Comment:
   Can we defer this until we actually need it? as if each sort only get a 
single batch or we do in place sorting we will convert it to rows without need



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to