teh-cmc opened a new pull request, #6300:
URL: https://github.com/apache/arrow-rs/pull/6300
The extra capacity that is initially used to efficiently append data to the
mutable buffer ends up leaking into the final immutable buffer, where it will
linger indefinitely, thereforehogging memory.
---
Consider this code:
```rust
use arrow::{
array::{Array, ArrayRef, ListArray, PrimitiveArray},
buffer::OffsetBuffer,
datatypes::{Field, UInt8Type},
};
fn main() {
let array0: PrimitiveArray<UInt8Type> = (0..200 * 300)
.map(|v| (v % 255) as u8)
.collect::<Vec<_>>()
.into();
let array0: ArrayRef = Arc::new(array0);
let (global, local) = memory_use(|| {
let concatenated = concatenate(array0.clone());
dbg!(concatenated.data_type());
eprintln!("expected: ~{}", how_many_bytes(concatenated.clone()));
concatenated
});
eprintln!("global: {global} bytes");
eprintln!("local: {local} bytes");
eprintln!("---");
let array1 = ListArray::new(
Field::new_list_field(array0.data_type().clone(), false).into(),
OffsetBuffer::from_lengths(std::iter::once(array0.len())),
array0.clone(),
None,
);
let array1: ArrayRef = Arc::new(array1);
let (global, local) = memory_use(|| {
let concatenated = concatenate(array1.clone());
dbg!(concatenated.data_type());
eprintln!("expected: ~{}", how_many_bytes(concatenated.clone()));
concatenated
});
eprintln!("global: {global} bytes");
eprintln!("local: {local} bytes");
}
fn concatenate(array: ArrayRef) -> ArrayRef {
let mut concatenated = array.clone();
for _ in 0..1000 {
concatenated =
arrow::compute::kernels::concat::concat(&[&*concatenated, &*array]).unwrap();
}
concatenated
}
fn how_many_bytes(array: ArrayRef) -> u64 {
let mut array = array;
loop {
match array.data_type() {
arrow::datatypes::DataType::UInt8 => break,
arrow::datatypes::DataType::List(_) => {
let list =
array.as_any().downcast_ref::<ListArray>().unwrap();
array = list.values().clone();
}
_ => unreachable!(),
}
}
array.len() as _
}
// --- Memory tracking ---
use std::sync::{
atomic::{AtomicUsize, Ordering::Relaxed},
Arc,
};
static LIVE_BYTES_GLOBAL: AtomicUsize = AtomicUsize::new(0);
thread_local! {
static LIVE_BYTES_IN_THREAD: AtomicUsize = AtomicUsize::new(0);
}
pub struct TrackingAllocator {
allocator: std::alloc::System,
}
#[global_allocator]
pub static GLOBAL_ALLOCATOR: TrackingAllocator = TrackingAllocator {
allocator: std::alloc::System,
};
#[allow(unsafe_code)]
// SAFETY:
// We just do book-keeping and then let another allocator do all the actual
work.
unsafe impl std::alloc::GlobalAlloc for TrackingAllocator {
#[allow(clippy::let_and_return)]
unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_add(layout.size(),
Relaxed));
LIVE_BYTES_GLOBAL.fetch_add(layout.size(), Relaxed);
// SAFETY:
// Just deferring
unsafe { self.allocator.alloc(layout) }
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_sub(layout.size(),
Relaxed));
LIVE_BYTES_GLOBAL.fetch_sub(layout.size(), Relaxed);
// SAFETY:
// Just deferring
unsafe { self.allocator.dealloc(ptr, layout) };
}
}
fn live_bytes_local() -> usize {
LIVE_BYTES_IN_THREAD.with(|bytes| bytes.load(Relaxed))
}
fn live_bytes_global() -> usize {
LIVE_BYTES_GLOBAL.load(Relaxed)
}
/// Returns `(num_bytes_allocated, num_bytes_allocated_by_this_thread)`.
fn memory_use<R>(run: impl Fn() -> R) -> (usize, usize) {
let used_bytes_start_local = live_bytes_local();
let used_bytes_start_global = live_bytes_global();
let ret = run();
let bytes_used_local = live_bytes_local() - used_bytes_start_local;
let bytes_used_global = live_bytes_global() - used_bytes_start_global;
drop(ret);
(bytes_used_global, bytes_used_local)
}
```
`HEAD`:
```
[src/main.rs:16:9] concatenated.data_type() = UInt8
expected: ~60060000
global: 60060200 bytes
local: 60060200 bytes
---
[src/main.rs:35:9] concatenated.data_type() = List(
Field {
name: "item",
data_type: UInt8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
)
expected: ~60060000
global: 120004384 bytes
local: 120004384 bytes
```
This patch:
```
[src/main.rs:16:9] concatenated.data_type() = UInt8
expected: ~60060000
global: 60060200 bytes
local: 60060200 bytes
---
[src/main.rs:35:9] concatenated.data_type() = List(
Field {
name: "item",
data_type: UInt8,
nullable: false,
dict_id: 0,
dict_is_ordered: false,
metadata: {},
},
)
expected: ~60060000
global: 60064416 bytes
local: 60064416 bytes
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]