teh-cmc opened a new pull request, #6300:
URL: https://github.com/apache/arrow-rs/pull/6300

   The extra capacity that is initially used to efficiently append data to the 
mutable buffer ends up leaking into the final immutable buffer, where it will 
linger indefinitely, thereforehogging memory.
   
   ---
   
   Consider this code:
   ```rust
   use arrow::{
       array::{Array, ArrayRef, ListArray, PrimitiveArray},
       buffer::OffsetBuffer,
       datatypes::{Field, UInt8Type},
   };
   
   fn main() {
       let array0: PrimitiveArray<UInt8Type> = (0..200 * 300)
           .map(|v| (v % 255) as u8)
           .collect::<Vec<_>>()
           .into();
       let array0: ArrayRef = Arc::new(array0);
   
       let (global, local) = memory_use(|| {
           let concatenated = concatenate(array0.clone());
           dbg!(concatenated.data_type());
           eprintln!("expected: ~{}", how_many_bytes(concatenated.clone()));
           concatenated
       });
       eprintln!("global: {global} bytes");
       eprintln!("local: {local} bytes");
   
       eprintln!("---");
   
       let array1 = ListArray::new(
           Field::new_list_field(array0.data_type().clone(), false).into(),
           OffsetBuffer::from_lengths(std::iter::once(array0.len())),
           array0.clone(),
           None,
       );
       let array1: ArrayRef = Arc::new(array1);
   
       let (global, local) = memory_use(|| {
           let concatenated = concatenate(array1.clone());
           dbg!(concatenated.data_type());
           eprintln!("expected: ~{}", how_many_bytes(concatenated.clone()));
           concatenated
       });
       eprintln!("global: {global} bytes");
       eprintln!("local: {local} bytes");
   }
   
   fn concatenate(array: ArrayRef) -> ArrayRef {
       let mut concatenated = array.clone();
   
       for _ in 0..1000 {
           concatenated = 
arrow::compute::kernels::concat::concat(&[&*concatenated, &*array]).unwrap();
       }
   
       concatenated
   }
   
   fn how_many_bytes(array: ArrayRef) -> u64 {
       let mut array = array;
       loop {
           match array.data_type() {
               arrow::datatypes::DataType::UInt8 => break,
               arrow::datatypes::DataType::List(_) => {
                   let list = 
array.as_any().downcast_ref::<ListArray>().unwrap();
                   array = list.values().clone();
               }
               _ => unreachable!(),
           }
       }
   
       array.len() as _
   }
   
   // --- Memory tracking ---
   
   use std::sync::{
       atomic::{AtomicUsize, Ordering::Relaxed},
       Arc,
   };
   
   static LIVE_BYTES_GLOBAL: AtomicUsize = AtomicUsize::new(0);
   
   thread_local! {
       static LIVE_BYTES_IN_THREAD: AtomicUsize = AtomicUsize::new(0);
   }
   
   pub struct TrackingAllocator {
       allocator: std::alloc::System,
   }
   
   #[global_allocator]
   pub static GLOBAL_ALLOCATOR: TrackingAllocator = TrackingAllocator {
       allocator: std::alloc::System,
   };
   
   #[allow(unsafe_code)]
   // SAFETY:
   // We just do book-keeping and then let another allocator do all the actual 
work.
   unsafe impl std::alloc::GlobalAlloc for TrackingAllocator {
       #[allow(clippy::let_and_return)]
       unsafe fn alloc(&self, layout: std::alloc::Layout) -> *mut u8 {
           LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_add(layout.size(), 
Relaxed));
           LIVE_BYTES_GLOBAL.fetch_add(layout.size(), Relaxed);
   
           // SAFETY:
           // Just deferring
           unsafe { self.allocator.alloc(layout) }
       }
   
       unsafe fn dealloc(&self, ptr: *mut u8, layout: std::alloc::Layout) {
           LIVE_BYTES_IN_THREAD.with(|bytes| bytes.fetch_sub(layout.size(), 
Relaxed));
           LIVE_BYTES_GLOBAL.fetch_sub(layout.size(), Relaxed);
   
           // SAFETY:
           // Just deferring
           unsafe { self.allocator.dealloc(ptr, layout) };
       }
   }
   
   fn live_bytes_local() -> usize {
       LIVE_BYTES_IN_THREAD.with(|bytes| bytes.load(Relaxed))
   }
   
   fn live_bytes_global() -> usize {
       LIVE_BYTES_GLOBAL.load(Relaxed)
   }
   
   /// Returns `(num_bytes_allocated, num_bytes_allocated_by_this_thread)`.
   fn memory_use<R>(run: impl Fn() -> R) -> (usize, usize) {
       let used_bytes_start_local = live_bytes_local();
       let used_bytes_start_global = live_bytes_global();
       let ret = run();
       let bytes_used_local = live_bytes_local() - used_bytes_start_local;
       let bytes_used_global = live_bytes_global() - used_bytes_start_global;
       drop(ret);
       (bytes_used_global, bytes_used_local)
   }
   ```
   
   `HEAD`:
   ```
   [src/main.rs:16:9] concatenated.data_type() = UInt8
   expected: ~60060000
   global: 60060200 bytes
   local: 60060200 bytes
   ---
   [src/main.rs:35:9] concatenated.data_type() = List(
       Field {
           name: "item",
           data_type: UInt8,
           nullable: false,
           dict_id: 0,
           dict_is_ordered: false,
           metadata: {},
       },
   )
   expected: ~60060000
   global: 120004384 bytes
   local: 120004384 bytes
   ```
   
   This patch:
   ```
   [src/main.rs:16:9] concatenated.data_type() = UInt8
   expected: ~60060000
   global: 60060200 bytes
   local: 60060200 bytes
   ---
   [src/main.rs:35:9] concatenated.data_type() = List(
       Field {
           name: "item",
           data_type: UInt8,
           nullable: false,
           dict_id: 0,
           dict_is_ordered: false,
           metadata: {},
       },
   )
   expected: ~60060000
   global: 60064416 bytes
   local: 60064416 bytes
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to