viirya commented on code in PR #988:
URL: https://github.com/apache/datafusion-comet/pull/988#discussion_r1797416210
##########
native/core/src/execution/datafusion/shuffle_writer.rs:
##########
@@ -206,58 +207,74 @@ struct PartitionBuffer {
/// The maximum number of rows in a batch. Once `num_active_rows` reaches
`batch_size`,
/// the active array builders will be frozen and appended to frozen buffer
`frozen`.
batch_size: usize,
+ /// Memory reservation for this partition buffer.
+ reservation: MemoryReservation,
}
impl PartitionBuffer {
- fn new(schema: SchemaRef, batch_size: usize) -> Self {
+ fn new(
+ schema: SchemaRef,
+ batch_size: usize,
+ partition_id: usize,
+ runtime: &Arc<RuntimeEnv>,
+ ) -> Self {
+ let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]",
partition_id))
+ .with_can_spill(true)
+ .register(&runtime.memory_pool);
+
Self {
schema,
frozen: vec![],
active: vec![],
active_slots_mem_size: 0,
num_active_rows: 0,
batch_size,
+ reservation,
}
}
/// Initializes active builders if necessary.
+ /// Returns error if memory reservation fails.
fn init_active_if_necessary(&mut self) -> Result<isize> {
let mut mem_diff = 0;
if self.active.is_empty() {
- self.active = new_array_builders(&self.schema, self.batch_size);
+ // Estimate the memory size of active builders
if self.active_slots_mem_size == 0 {
self.active_slots_mem_size = self
- .active
+ .schema
+ .fields()
.iter()
- .zip(self.schema.fields())
- .map(|(_ab, field)| slot_size(self.batch_size,
field.data_type()))
+ .map(|field| slot_size(self.batch_size, field.data_type()))
.sum::<usize>();
}
+
+ self.reservation.try_grow(self.active_slots_mem_size)?;
+
+ self.active = new_array_builders(&self.schema, self.batch_size);
+
mem_diff += self.active_slots_mem_size as isize;
}
Ok(mem_diff)
}
- /// Appends all rows of given batch into active array builders.
- fn append_batch(&mut self, batch: &RecordBatch, time_metric: &Time) ->
Result<isize> {
- let columns = batch.columns();
- let indices = (0..batch.num_rows()).collect::<Vec<usize>>();
- self.append_rows(columns, &indices, time_metric)
- }
-
/// Appends rows of specified indices from columns into active array
builders.
fn append_rows(
&mut self,
columns: &[ArrayRef],
indices: &[usize],
+ start_index: usize,
time_metric: &Time,
- ) -> Result<isize> {
+ ) -> Either<Result<isize>, usize> {
Review Comment:
Okay
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]