davidhewitt opened a new issue, #7243:
URL: https://github.com/apache/arrow-rs/issues/7243

   **Describe the bug**
   
   When reading a dictionary-encoded string column from Parquet, we are getting 
an error from an index overflow reading data that was within valid bounds when 
written.
   
   **To Reproduce**
   
   See the below minimal reproduction:
   
   ```
   use std::sync::Arc;
   
   use arrow::{
       array::{DictionaryArray, PrimitiveArray, RecordBatch, StringArray},
       datatypes::{DataType, Field, Int32Type, Schema},
   };
   use bytes::{BufMut, BytesMut};
   use parquet::arrow::{
       ArrowWriter,
       arrow_reader::{ArrowReaderMetadata, ParquetRecordBatchReaderBuilder},
   };
   
   fn main() {
       // Schema is single column, dictionary-encoded string
       let schema = Arc::new(Schema::new(vec![Field::new(
           "a",
           DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8)),
           true,
       )]));
   
       // First batch contains a single non-null row, with a massive 4MB string
       let batch1 = RecordBatch::try_new(
           schema.clone(),
           vec![Arc::new(DictionaryArray::new(
               PrimitiveArray::<Int32Type>::from(vec![Some(0)]),
               Arc::new(StringArray::from(vec![Some("a".repeat(4_000_000))])),
           ))],
       )
       .unwrap();
   
       // Second batch is 600 nulls
       let batch2 = RecordBatch::try_new(
           schema.clone(),
           vec![Arc::new(DictionaryArray::new(
               PrimitiveArray::<Int32Type>::from(vec![None; 600]),
               Arc::new(StringArray::new_null(0)),
           ))],
       )
       .unwrap();
   
       // Third batch is a single non-null row, it doesn't matter what value is 
for this
       // reproduction.
       let batch3 = RecordBatch::try_new(
           schema.clone(),
           vec![Arc::new(DictionaryArray::new(
               PrimitiveArray::<Int32Type>::from(vec![Some(0)]),
               Arc::new(StringArray::from(vec![Some("b")])),
           ))],
       )
       .unwrap();
   
       // Write the batches to parquet format
       let buf = BytesMut::new();
       let mut writer = ArrowWriter::try_new(buf.writer(), schema.clone(), 
None).unwrap();
       for batch in [batch1, batch2, batch3] {
           // Flush after each batch, this has the effect of placing each batch 
into its own row group
           writer.write(&batch).unwrap();
           writer.flush().unwrap();
       }
       let buf = writer.into_inner().unwrap().into_inner().freeze();
   
       // Sanity check the metadata
       let meta = ArrowReaderMetadata::load(&buf, Default::default()).unwrap();
       let row_group_meta = meta.metadata().row_groups();
       let mut running_total = 0;
       for (i, rg_meta) in row_group_meta.iter().enumerate() {
           running_total += rg_meta.num_rows();
           println!(
               "row_group[{i}]={:>4}, total={running_total}",
               rg_meta.num_rows()
           );
       }
   
       // Read the parquet file back
       let mut reader = ParquetRecordBatchReaderBuilder::try_new(buf)
           .unwrap()
           .build()
           .unwrap();
   
       // default batch size is large enough that the parquet reader merges the 
row groups into a
       // single batch, and this merging op triggers the index overflow
       reader.next().unwrap().unwrap();
   }
   ```
   
   **Expected behavior**
   
   The reader should not overflow.
   
   **Additional context**
   
   I have done some debugging to arrive at this reproduction. Here's my 
analysis:
   
   - The parquet reader will merge row groups to arrive at the desired batch 
size. In this case, 602 values is smaller than the default batch size, so it 
will read all three row groups as a single `RecordBatch`.
   - Doing so requires merging the three dictionaries
   - The second dictionary contains 600 nulls, so when reading the third row 
group the state is 601 values, of which only the first is non-null, a 4MB value.
   - To merge the third dictionary, the current state is flattened. This has 
the effect of allocating 600*4MB chunks in the buffer for the nulls, which 
overflows the i32 size.
   
   This is obviously a bit of an edge case, however we are hitting this in 
production at Pydantic.
   
   We can do mitigations like truncate the 4MB value, limit batch sizes and 
also maybe switch from dictionaries to string views.
   
   I think maybe there's also a possible mitigation in the `parquet` crate, to 
add a reader option to not merge row groups? That might be a nice efficiency 
win for decoding.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to