milenkovicm commented on code in PR #4202:
URL: https://github.com/apache/arrow-datafusion/pull/4202#discussion_r1024184397
##########
datafusion/core/src/physical_plan/aggregates/row_hash.rs:
##########
@@ -285,21 +334,40 @@ fn group_aggregate_batch(
indices: vec![row as u32], // 1.3
};
let group_idx = group_states.len();
- group_states.push(group_state);
- groups_with_rows.push(group_idx);
+
+ // NOTE: do NOT include the `RowGroupState` struct size in
here because this is captured by
+ // `group_states` (see allocation down below)
+ allocated += (std::mem::size_of::<u8>()
+ * group_state.group_by_values.capacity())
+ + (std::mem::size_of::<u8>()
+ * group_state.aggregation_buffer.capacity())
+ + (std::mem::size_of::<u32>() *
group_state.indices.capacity());
// for hasher function, use precomputed hash value
- map.insert(hash, (hash, group_idx), |(hash, _group_idx)|
*hash);
+ map.insert_accounted(
+ (hash, group_idx),
+ |(hash, _group_index)| *hash,
+ &mut allocated,
+ );
+
+ group_states.push_accounted(group_state, &mut allocated);
+
+ groups_with_rows.push(group_idx);
}
};
}
+ // allocate memory
+ // This happens AFTER we actually used the memory, but simplifies the
whole accounting and we are OK with
+ // overshooting a bit. Also this means we either store the whole
record batch or not.
+ memory_consumer.alloc(allocated).await?;
Review Comment:
as i mentioned above, should this call go before return statement? if it
triggers spill we internal state should be consistent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]