alamb commented on issue #15028:
URL: https://github.com/apache/datafusion/issues/15028#issuecomment-2703686348

   > Parquet writer consumed most of the memory and triggered the allocation 
failure. The memory reserved is too small for Parquet writer to hold row groups 
in memory before flushing out to disk.
   
   This is a great find / analysis -- thank you @ivankelly and @Kontinuation 
   
   We actually found something like this upstream in InfluxDB IOx (kudos to 
@wiedld) and have a solution
   
   The Parquet writer can report its memory usage via 
[`ArrowWriter::memory_size`](https://docs.rs/parquet/latest/parquet/arrow/arrow_writer/struct.ArrowWriter.html#method.memory_size)
   
   So what we did was to add code that reported that usage to the memory pool 
here
   
   
https://github.com/influxdata/influxdb3_core/blob/26a30bf8d6e2b6b3f1dd905c4ec27e3db6e20d5f/parquet_file/src/writer.rs#L28-L99
   
   ```rust
   
   /// Wraps an [`ArrowWriter`] to track its buffered memory in a
   /// DataFusion [`MemoryPool`]
   ///
   /// If the memory used by the `ArrowWriter` exceeds the memory allowed
   /// by the `MemoryPool`, subsequent writes will fail.
   ///
   /// Note no attempt is made to cap the memory used by the
   /// `ArrowWriter` (for example by flushing earlier), which might be a
   /// useful exercise.
   #[derive(Debug)]
   pub struct TrackedMemoryArrowWriter<W: Write + Send> {
       /// The inner ArrowWriter
       inner: ArrowWriter<W>,
       /// DataFusion memory reservation with
       reservation: MemoryReservation,
   }
   
   impl<W: Write + Send> TrackedMemoryArrowWriter<W> {
       /// create a new `TrackedMemoryArrowWriter<`
       pub fn try_new(
           sink: W,
           schema: SchemaRef,
           props: WriterProperties,
           pool: Arc<dyn MemoryPool>,
       ) -> Result<Self> {
           let inner = ArrowWriter::try_new(sink, schema, Some(props))?;
           let consumer = MemoryConsumer::new("IOx ParquetWriter 
(TrackedMemoryArrowWriter)");
           let reservation = consumer.register(&pool);
   
           Ok(Self { inner, reservation })
       }
   
       /// Push a `RecordBatch` into the underlying writer, updating the
       /// tracked allocation
       pub fn write(&mut self, batch: RecordBatch) -> Result<()> {
           // writer encodes the batch into its internal buffers
           let result = self.inner.write(&batch);
   
           // In progress memory, in bytes
           let in_progress_size = self.inner.memory_size();
   
           // update the allocation with the pool.
           let reservation_result = self
               .reservation
               .try_resize(in_progress_size)
               .map_err(Error::OutOfMemory);
   
           // Log any errors
           if let Err(e) = &reservation_result {
               warn!(
                   %e,
                   in_progress_size,
                   in_progress_rows = self.inner.in_progress_rows(),
                   existing_allocation = self.reservation.size(),
                   "Could not allocate sufficient buffer memory for writing 
parquet data"
               );
           }
   
           reservation_result?;
           result?;
           Ok(())
       }
   
       /// closes the writer, flushing any remaining data and returning
       /// the written [`FileMetaData`]
       ///
       /// [`FileMetaData`]: parquet::format::FileMetaData
       pub fn close(self) -> Result<parquet::format::FileMetaData> {
           // reservation is returned on drop
           Ok(self.inner.close()?)
       }
   }
   ```
   
   Maybe we could use the same code / approach when DataFusion makes its arrow 
writer here:
   
   
https://github.com/apache/datafusion/blob/da4293323032e2408c9e3b9b28e644a96aea0f13/datafusion/datasource-parquet/src/file_format.rs#L922-L944
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to