viirya commented on code in PR #1022:
URL: https://github.com/apache/datafusion-comet/pull/1022#discussion_r1807068484
##########
native/core/src/execution/datafusion/shuffle_writer.rs:
##########
@@ -954,10 +986,65 @@ impl ShuffleRepartitioner {
});
Ok(used)
}
+
+ /// Appends rows of specified indices from columns into active array
builders in the specified partition.
+ async fn append_rows_to_partition(
+ &mut self,
+ columns: &[ArrayRef],
+ indices: &[usize],
+ partition_id: usize,
+ ) -> Result<isize> {
+ let mut mem_diff = 0;
+
+ let output = &mut self.buffered_partitions[partition_id];
+
+ let time_metric = self.metrics.baseline.elapsed_compute();
+
+ // If the range of indices is not big enough, just appending the rows
into
+ // active array builders instead of directly adding them as a record
batch.
+ let mut start_index: usize = 0;
+ let mut output_ret = output.append_rows(columns, indices, start_index,
time_metric);
+
+ loop {
+ match output_ret {
+ AppendRowStatus::MemDiff(l) => {
+ mem_diff += l?;
+ break;
+ }
+ AppendRowStatus::StartIndex(new_start) => {
+ // Cannot allocate enough memory for the array builders in
the partition,
+ // spill partitions and retry.
+ self.spill().await?;
+ self.reservation.free();
Review Comment:
I guess that is because we silently use some memory but never report them
into the reservation, like the memory usage on array builders, now we count for
them. So under same memory settings, it is more likely you hit the bar of
memory pool. Have you try to increase the Comet memory like
`spark.comet.memoryOverhead`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]