alamb commented on issue #15028: URL: https://github.com/apache/datafusion/issues/15028#issuecomment-2703686348
> Parquet writer consumed most of the memory and triggered the allocation failure. The memory reserved is too small for Parquet writer to hold row groups in memory before flushing out to disk. This is a great find / analysis -- thank you @ivankelly and @Kontinuation We actually found something like this upstream in InfluxDB IOx (kudos to @wiedld) and have a solution The Parquet writer can report its memory usage via [`ArrowWriter::memory_size`](https://docs.rs/parquet/latest/parquet/arrow/arrow_writer/struct.ArrowWriter.html#method.memory_size) So what we did was to add code that reported that usage to the memory pool here https://github.com/influxdata/influxdb3_core/blob/26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f/parquet_file/src/writer.rs#L28-L99 ```rust /// Wraps an [`ArrowWriter`] to track its buffered memory in a /// DataFusion [`MemoryPool`] /// /// If the memory used by the `ArrowWriter` exceeds the memory allowed /// by the `MemoryPool`, subsequent writes will fail. /// /// Note no attempt is made to cap the memory used by the /// `ArrowWriter` (for example by flushing earlier), which might be a /// useful exercise. #[derive(Debug)] pub struct TrackedMemoryArrowWriter<W: Write + Send> { /// The inner ArrowWriter inner: ArrowWriter<W>, /// DataFusion memory reservation with reservation: MemoryReservation, } impl<W: Write + Send> TrackedMemoryArrowWriter<W> { /// create a new `TrackedMemoryArrowWriter<` pub fn try_new( sink: W, schema: SchemaRef, props: WriterProperties, pool: Arc<dyn MemoryPool>, ) -> Result<Self> { let inner = ArrowWriter::try_new(sink, schema, Some(props))?; let consumer = MemoryConsumer::new("IOx ParquetWriter (TrackedMemoryArrowWriter)"); let reservation = consumer.register(&pool); Ok(Self { inner, reservation }) } /// Push a `RecordBatch` into the underlying writer, updating the /// tracked allocation pub fn write(&mut self, batch: RecordBatch) -> Result<()> { // writer encodes the batch into its internal buffers let result = self.inner.write(&batch); // In progress memory, in bytes let in_progress_size = self.inner.memory_size(); // update the allocation with the pool. let reservation_result = self .reservation .try_resize(in_progress_size) .map_err(Error::OutOfMemory); // Log any errors if let Err(e) = &reservation_result { warn!( %e, in_progress_size, in_progress_rows = self.inner.in_progress_rows(), existing_allocation = self.reservation.size(), "Could not allocate sufficient buffer memory for writing parquet data" ); } reservation_result?; result?; Ok(()) } /// closes the writer, flushing any remaining data and returning /// the written [`FileMetaData`] /// /// [`FileMetaData`]: parquet::format::FileMetaData pub fn close(self) -> Result<parquet::format::FileMetaData> { // reservation is returned on drop Ok(self.inner.close()?) } } ``` Maybe we could use the same code / approach when DataFusion makes its arrow writer here: https://github.com/apache/datafusion/blob/da4293323032e2408c9e3b9b28e644a96aea0f13/datafusion/datasource-parquet/src/file_format.rs#L922-L944 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org