AdamGS opened a new issue, #4365:
URL: https://github.com/apache/arrow-rs/issues/4365

   **Describe the bug**
   <!--
   A clear and concise description of what the bug is.
   -->
   Reading Parquet files that have a List array with items of type `Utf8`, 
panic in the presence of a filter on another column due to a length mismatch of 
arrays when building the underlying StructArray.
   
   **To Reproduce**
   <!--
   Steps to reproduce the behavior:
   -->
   This is a pretty small example I built:
   ```rust
   use arrow::array::{ListBuilder, StringBuilder, UInt32Array};
   use arrow::datatypes::{DataType, Field, Schema};
   use arrow::record_batch::RecordBatch;
   use futures::StreamExt;
   use std::sync::Arc;
   
   use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
   use parquet::arrow::async_reader::AsyncFileReader;
   use parquet::arrow::ParquetRecordBatchStreamBuilder;
   use parquet::arrow::{ArrowWriter, ProjectionMask};
   use parquet::file::properties::WriterProperties;
   use std::error::Error;
   
   use tempfile::NamedTempFile;
   
   const BATCH_SIZE: usize = 1024;
   
   #[tokio::main]
   async fn main() -> Result<(), Box<dyn Error>> {
       let schema = Arc::new(Schema::new(vec![
           Field::new(
               "list",
               DataType::List(Arc::new(Field::new("item", DataType::Utf8, 
true))),
               false,
           ),
           Field::new("numbers", DataType::UInt32, false),
       ]));
       let temp_file = NamedTempFile::new()?;
   
       let mut writer = ArrowWriter::try_new(
           temp_file.reopen()?,
           schema.clone(),
           Some(WriterProperties::builder().build()),
       )?;
       for _ in 0..2 {
           let mut list_a_builder = ListBuilder::new(StringBuilder::new());
           for i in 0..1024 {
               list_a_builder.values().append_value(format!("{i}"));
   
               list_a_builder.append(true);
           }
           let batch = RecordBatch::try_new(
               schema.clone(),
               vec![
                   Arc::new(list_a_builder.finish()),
                   Arc::new(UInt32Array::from_iter_values(
                       (0..BATCH_SIZE).map(|n| n as u32),
                   )),
               ],
           )?;
           writer.write(&batch)?;
       }
       let _metadata = writer.close()?;
   
       let mut file = tokio::fs::File::open(temp_file.path()).await.unwrap();
   
       let parquet_metadata = file.get_metadata().await.unwrap();
       let file_metadata = parquet_metadata.file_metadata();
       let schema_descriptor = file_metadata.schema_descr();
   
       // We filter on the numerical column
       let row_filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
           ProjectionMask::leaves(schema_descriptor, vec![schema.fields().len() 
- 1]),
           |batch| arrow::compute::gt_dyn_scalar(batch.column(0), 100),
       ))]);
   
       // This is the key section - whether we materialize the list-of-strings 
column alone or with another column that was filtered
       #[cfg(feature = "with_bug")]
       let projection_mask = ProjectionMask::roots(schema_descriptor, [0, 1]); 
// Both columns
   
       #[cfg(not(feature = "with_bug"))]
       let projection_mask = ProjectionMask::roots(schema_descriptor, [0]); // 
Just the list of strings columns
   
       let mut reader = ParquetRecordBatchStreamBuilder::new(file)
           .await
           .unwrap()
           .with_row_filter(row_filter)
           .with_projection(projection_mask)
           .build()
           .unwrap();
   
       while let Some(rb) = reader.next().await {
           let rb = rb.unwrap();
           println!("count = {}", rb.num_rows())
       }
   
       Ok(())
   }
   ```
   
   the `Cargo.toml` file here looks like:
   ```toml
   [package]
   name = "reproduce"
   version = "0.1.0"
   edition = "2021"
   
   [features]
   with_bug = []
   
   # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
   
   [dependencies]
   arrow = { version = "40", features = ["simd"] }
   parquet = { version = "40", features = ["async"] }
   tempfile = "*"
   tokio = { version = "1.16.1", features = [
       "macros",
       "rt-multi-thread",
       "time",
       "fs",
   ] }
   futures = "0.3.18"
   ```
   
   run it with `cargo run --features with_bug`
   
   **Expected behavior**
   <!--
   A clear and concise description of what you expected to happen.
   -->
   My understanding is that it should work, and the example even contains a 
case where it works.
   
   **Additional context**
   <!--
   Add any other context about the problem here.
   -->
   Would be glad to take on the effort, I'm just not exactly sure what's the 
right way to tackle this issue. Seems like it's somewhere relativly deep the 
the `Parquet`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to