yyy1000 commented on code in PR #9469:
URL: https://github.com/apache/arrow-datafusion/pull/9469#discussion_r1513195857
##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -563,7 +507,7 @@ mod tests {
"| 1 | 3 | 0 |",
"+---+---+---+",
];
- assert_eq!(2, result.len());
+ assert_eq!(1, result.len());
Review Comment:
I think the sort in ExternalSort will merge the two `RecordBatch` to one if
no spilling?
##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -510,6 +398,62 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_partial_sort_spill() -> Result<()> {
Review Comment:
This is the new test case for spill, I mainly copy from sort.rs
##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -647,157 +591,6 @@ mod tests {
Ok(())
}
- fn prepare_partitioned_input() -> Arc<dyn ExecutionPlan> {
Review Comment:
I deleted these test case for the same reason, the sort in ExternalSort will
merge the RecordBatchs so I think these test cases are not necessary?
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -309,6 +310,66 @@ impl ExternalSorter {
Ok(())
}
+ fn get_slice_point(
+ &self,
+ common_prefix_len: usize,
+ batch: &RecordBatch,
+ ) -> Result<Option<usize>> {
+ let common_prefix_sort_keys = (0..common_prefix_len)
+ .map(|idx| self.expr[idx].evaluate_to_sort_column(batch))
+ .collect::<Result<Vec<_>>>()?;
+ let partition_points =
+ evaluate_partition_ranges(batch.num_rows(),
&common_prefix_sort_keys)?;
+ // If partition points are [0..100], [100..200], [200..300]
+ // we should return 200, which is the safest and furthest partition
boundary
+ // Please note that we shouldn't return 300 (which is number of rows
in the batch),
+ // because this boundary may change with new data.
+ if partition_points.len() >= 2 {
+ Ok(Some(partition_points[partition_points.len() - 2].end))
+ } else {
+ Ok(None)
+ }
+ }
+ pub(crate) async fn insert_batch_with_prefix(
+ &mut self,
+ input: RecordBatch,
+ prefix: usize,
+ ) -> Result<()> {
+ if input.num_rows() == 0 {
+ return Ok(());
+ }
+ self.reserve_memory_for_merge()?;
+
+ let size = input.get_array_memory_size();
+ if self.reservation.try_grow(size).is_err() {
+ let before = self.reservation.size();
+ self.in_mem_sort().await?;
+ // Sorting may have freed memory, especially if fetch is `Some`
+ //
+ // As such we check again, and if the memory usage has dropped by
+ // a factor of 2, and we can allocate the necessary capacity,
+ // we don't spill
+ //
+ // The factor of 2 aims to avoid a degenerate case where the
+ // memory required for `fetch` is just under the memory available,
+ // causing repeated re-sorting of data
+ if self.reservation.size() > before / 2
+ || self.reservation.try_grow(size).is_err()
+ {
+ self.spill().await?;
+ self.reservation.try_grow(size)?
+ }
+ }
+ if let Some(slice_point) = self.get_slice_point(prefix, &input)? {
+ self.in_mem_batches.push(input.slice(0, slice_point));
+ self.in_mem_batches
+ .push(input.slice(slice_point, input.num_rows() -
slice_point));
+ } else {
+ self.in_mem_batches.push(input);
+ }
+ self.in_mem_batches_sorted = false;
+ Ok(())
+ }
Review Comment:
Here it also first `get_slice_point` and then push them in `in_mem_batches`.
A question I have is, in the past, the 2nd RecordBatch will concat to the
remaining of the 1st RecordBatch, but here I didn't implement that. 🤔 Don't
know whether it's necessary.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]