marekgalovic commented on issue #4921:
URL: https://github.com/apache/arrow-rs/issues/4921#issuecomment-1760048200
@tustvold unfortunately, I cannot share the file that fails. Below is an
example that reproduces what I suspect is the underlying issue but fails with a
slightly different error since there are no predicates. Using only the first
column in the projection mask, it correctly reads 9667 rows but when I add the
second one (or use the second one alone) it fails with:
```
ArrowError("Parquet argument error: Parquet error: Invalid offset in sparse
column chunk data: 804367")
```
```rust
use std::fs::File;
use std::sync::Arc;
use arrow::array::{ArrayRef, ListArray, ListBuilder, StringBuilder,
UInt64Array};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use futures::StreamExt;
use itertools::Itertools;
use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection,
RowSelector};
use parquet::arrow::{ArrowWriter, ParquetRecordBatchStreamBuilder,
ProjectionMask};
use parquet::file::properties::{EnabledStatistics, WriterProperties};
use tempdir::TempDir;
pub fn build_list_array<I: Iterator<Item = impl Into<Option<Vec<String>>>>>(
values: I,
) -> ListArray {
let mut list_builder = ListBuilder::new(StringBuilder::new());
for s in values {
if let Some(v) = s.into() {
for value in v.into_iter() {
list_builder.values().append_value(value);
}
}
list_builder.append(true);
}
list_builder.finish()
}
#[tokio::main]
async fn main() {
let schema = Arc::new(Schema::new(vec![
Field::new("col_1", DataType::UInt64, false),
Field::new(
"col_2",
DataType::List(Arc::new(Field::new("item", DataType::Utf8,
true))),
true,
),
]));
// Default writer properties
let props = WriterProperties::builder()
.set_write_batch_size(1024)
.set_data_page_row_count_limit(1024)
.set_max_row_group_size(100_000)
.set_statistics_enabled(EnabledStatistics::Page)
.set_dictionary_enabled(false)
.set_bloom_filter_enabled(false);
// Write data
let workdir = TempDir::new("parquet").unwrap();
let file_path = workdir.path().join("data.parquet");
let mut writer = ArrowWriter::try_new(
File::create(&file_path).unwrap(),
schema.clone(),
Some(props.build()),
)
.unwrap();
(0..200_000_u64).chunks(1024).into_iter().for_each(|ids| {
let ids: Vec<_> = ids.collect();
let list_vals = ids
.iter()
.map(|id| match id % 3 {
0 => Some(vec!["val_1".to_string(), format!("id_{id}")]),
1 => Some(vec![format!("id_{id}")]),
_ => None,
})
.collect_vec();
let refs = vec![
Arc::new(UInt64Array::from(ids)) as ArrayRef,
Arc::new(build_list_array(list_vals.into_iter())) as ArrayRef,
];
let batch = RecordBatch::try_new(schema.clone(), refs).unwrap();
writer.write(&batch).unwrap();
});
writer.close().unwrap();
// Read data
let mut reader = ParquetRecordBatchStreamBuilder::new_with_options(
tokio::fs::File::open(&file_path).await.unwrap(),
ArrowReaderOptions::new().with_page_index(true),
)
.await
.unwrap();
let proj_mask = ProjectionMask::roots(reader.parquet_schema(), [0, 1]);
reader = reader
.with_projection(proj_mask)
.with_batch_size(1024)
.with_row_groups(vec![0])
.with_row_selection(RowSelection::from(vec![
RowSelector::skip(16313),
RowSelector::select(3569),
RowSelector::skip(48237),
RowSelector::select(6097),
RowSelector::skip(25783),
RowSelector::select(1),
]));
let mut stream = reader.build().unwrap();
let mut total_rows = 0;
while let Some(rb) = stream.next().await {
let rb = rb.unwrap();
total_rows += rb.num_rows();
}
println!("read rows: {total_rows}");
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]