emilk opened a new pull request, #6787:
URL: https://github.com/apache/arrow-rs/pull/6787
# Which issue does this PR close?
* Closes #6360
# Rationale for this change
Concatenating many arrow buffers incrementally can lead to situations where
the buffers are using much more memory than they need (their capacity is larger
than their lengths).
Example:
```rs
use arrow::{
array::{Array, ArrayRef, ListArray, PrimitiveArray},
buffer::OffsetBuffer,
datatypes::{Field, UInt8Type},
};
fn main() {
let array0: PrimitiveArray<UInt8Type> = (0..200 * 300)
.map(|v| (v % 255) as u8)
.collect::<Vec<_>>()
.into();
let array0: ArrayRef = Arc::new(array0);
let (global, local) = memory_use(|| {
let concatenated = concatenate(array0.clone());
dbg!(concatenated.data_type());
eprintln!("expected: ~{}", how_many_bytes(concatenated.clone()));
concatenated
});
eprintln!("global: {global} bytes");
eprintln!("local: {local} bytes");
eprintln!("---");
let array1 = ListArray::new(
Field::new_list_field(array0.data_type().clone(), false).into(),
OffsetBuffer::from_lengths(std::iter::once(array0.len())),
array0.clone(),
None,
);
let array1: ArrayRef = Arc::new(array1);
let (global, local) = memory_use(|| {
let concatenated = concatenate(array1.clone()).shrink_to_fit();
dbg!(concatenated.data_type());
eprintln!("expected: ~{}", how_many_bytes(concatenated.clone()));
concatenated
});
eprintln!("global: {global} bytes");
eprintln!("local: {local} bytes");
}
fn concatenate(array: ArrayRef) -> ArrayRef {
let mut concatenated = array.clone();
for _ in 0..1000 {
concatenated =
arrow::compute::kernels::concat::concat(&[&*concatenated, &*array]).unwrap();
}
concatenated
}
fn how_many_bytes(array: ArrayRef) -> u64 {
let mut array = array;
loop {
match array.data_type() {
arrow::datatypes::DataType::UInt8 => break,
arrow::datatypes::DataType::List(_) => {
let list =
array.as_any().downcast_ref::<ListArray>().unwrap();
array = list.values().clone();
}
_ => unreachable!(),
}
}
array.len() as _
}
// --- Memory tracking ---
use std::sync::{
atomic::{AtomicUsize, Ordering::Relaxed},
Arc,
};
static LIVE_BYTES_GLOBAL: AtomicUsize = AtomicUsize::new(0);
thread_local! {
static LIVE_BYTES_IN_THREAD: AtomicUsize = const { AtomicUsize::new(0)
} ;
}
pub struct TrackingAllocator {
allocator: std::alloc::System,
}
#[global_allocator]
pub static GLOBAL_ALLOCATOR: TrackingAllocator = TrackingAllocator {
allocator: std::alloc::System,
};
#[allow(unsafe_code)]
// SAFETY:
// We just do book-keeping and then let another allocator do all the actual
work.
unsafe impl std::alloc::GlobalAlloc for TrackingAllocator {
#[allow(clippy::let_and_return)]
unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_add(layout.size(),
Relaxed));
LIVE_BYTES_GLOBAL.fetch_add(layout.size(), Relaxed);
// SAFETY:
// Just deferring
unsafe { self.allocator.alloc(layout) }
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_sub(layout.size(),
Relaxed));
LIVE_BYTES_GLOBAL.fetch_sub(layout.size(), Relaxed);
// SAFETY:
// Just deferring
unsafe { self.allocator.dealloc(ptr, layout) };
}
}
fn live_bytes_local() -> usize {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.load(Relaxed))
}
fn live_bytes_global() -> usize {
LIVE_BYTES_GLOBAL.load(Relaxed)
}
/// Returns `(num_bytes_allocated, num_bytes_allocated_by_this_thread)`.
fn memory_use<R>(run: impl Fn() -> R) -> (usize, usize) {
let used_bytes_start_local = live_bytes_local();
let used_bytes_start_global = live_bytes_global();
let ret = run();
let bytes_used_local = live_bytes_local() - used_bytes_start_local;
let bytes_used_global = live_bytes_global() - used_bytes_start_global;
drop(ret);
(bytes_used_global, bytes_used_local)
}
```
If you run this you will see 12 MB is used for 6 MB of data.
# What changes are included in this PR?
This PR adds `shrink_to_fit` to `Array` and all buffers.
# Are there any user-facing changes?
`trait Array` now has a `fn shrink_to_fit(&self) -> ArrayRef`.
# Problems
I could not implement `Array::shrink_to_fit` for `TypedDictionaryArray` nor
`TypedRunArray`, since they are types wrapping references. Calling
`shrink_to_fit` on these will result in an `unimplemented!` panic. Perhaps we
should return an error instead.
Due to the above problem, perhaps we should consider an alternative approach?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]