Copilot commented on code in PR #7971: URL: https://github.com/apache/arrow-rs/pull/7971#discussion_r2258245705
########## parquet/tests/arrow_reader/io/mod.rs: ########## @@ -0,0 +1,712 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for IO read patterns in the Parquet Reader +//! +//! Each test: +//! 1. Creates a temporary Parquet file with a known row group structure +//! 2. Reads data from that file using the Arrow Parquet Reader, recording the IO operations +//! 3. Asserts the expected IO patterns based on the read operations +//! +//! Note this module contains test infrastructure only. The actual tests are in the +//! sub-modules [`sync_reader`] and [`async_reader`]. +//! +//! Key components: +//! - [`TestParquetFile`] - Represents a Parquet file and its layout +//! - [`OperationLog`] - Records IO operations performed on the file +//! - [`LogEntry`] - Represents a single IO operation in the log + +mod sync_reader; + +#[cfg(feature = "async")] +mod async_reader; + +use arrow::compute::and; +use arrow::compute::kernels::cmp::{gt, lt}; +use arrow_array::cast::AsArray; +use arrow_array::types::Int64Type; +use arrow_array::{ArrayRef, BooleanArray, Int64Array, RecordBatch, StringViewArray}; +use bytes::Bytes; +use parquet::arrow::arrow_reader::{ + ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowFilter, +}; +use parquet::arrow::{ArrowWriter, ProjectionMask}; +use parquet::data_type::AsBytes; +use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, ParquetOffsetIndex}; +use parquet::file::properties::WriterProperties; +use parquet::file::FOOTER_SIZE; +use parquet::format::PageLocation; +use parquet::schema::types::SchemaDescriptor; +use std::collections::BTreeMap; +use std::fmt::Display; +use std::ops::Range; +use std::sync::{Arc, LazyLock, Mutex}; + +/// Create a new `TestParquetFile` with: +/// 3 columns: "a", "b", "c" +/// +/// 2 row groups, each with 200 rows +/// each data page has 100 rows +/// +/// Values of column "a" are 0..399 +/// Values of column "b" are 400..799 +/// Values of column "c" are alternating strings of length 12 and longer +fn test_file() -> TestParquetFile { + TestParquetFile::new(TEST_FILE_DATA.clone()) +} + +/// Default options for tests +/// +/// Note these tests use the PageIndex to reduce IO +fn test_options() -> ArrowReaderOptions { + ArrowReaderOptions::default().with_page_index(true) +} + +/// Return a row filter that evaluates "b > 575" AND "b < 625" +/// +/// last data page in Row Group 0 and first DataPage in Row Group 1 +fn filter_b_575_625(schema_descr: &SchemaDescriptor) -> RowFilter { + // "b" > 575 and "b" < 625 + let predicate = ArrowPredicateFn::new( + ProjectionMask::columns(schema_descr, ["b"]), + |batch: RecordBatch| { + let scalar_575 = Int64Array::new_scalar(575); + let scalar_625 = Int64Array::new_scalar(625); + let column = batch.column(0).as_primitive::<Int64Type>(); + and(>(column, &scalar_575)?, <(column, &scalar_625)?) + }, + ); + RowFilter::new(vec![Box::new(predicate)]) +} + +/// Filter a > 175 and b < 625 +/// First filter: "a" > 175 (last data page in Row Group 0) +/// Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1) +fn filter_a_175_b_625(schema_descr: &SchemaDescriptor) -> RowFilter { + // "a" > 175 and "b" < 625 + let predicate_a = ArrowPredicateFn::new( + ProjectionMask::columns(schema_descr, ["a"]), + |batch: RecordBatch| { + let scalar_175 = Int64Array::new_scalar(175); + let column = batch.column(0).as_primitive::<Int64Type>(); + gt(column, &scalar_175) + }, + ); + + let predicate_b = ArrowPredicateFn::new( + ProjectionMask::columns(schema_descr, ["b"]), + |batch: RecordBatch| { + let scalar_625 = Int64Array::new_scalar(625); + let column = batch.column(0).as_primitive::<Int64Type>(); + lt(column, &scalar_625) + }, + ); + + RowFilter::new(vec![Box::new(predicate_a), Box::new(predicate_b)]) +} + +/// Filter FALSE (no rows) with b +/// Entirely filters out both row groups +/// Note it selects "b" +fn filter_b_false(schema_descr: &SchemaDescriptor) -> RowFilter { + // "false" + let predicate = ArrowPredicateFn::new( + ProjectionMask::columns(schema_descr, ["b"]), + |batch: RecordBatch| { + let result = + BooleanArray::from_iter(std::iter::repeat_n(Some(false), batch.num_rows())); + Ok(result) + }, + ); + RowFilter::new(vec![Box::new(predicate)]) +} + +/// Create a parquet file in memory for testing. See [`test_file`] for details. +static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| { + // Input batch has 400 rows, with 3 columns: "a", "b", "c" + // Note c is a different types (so the data page sizes will be different) + let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400)); + let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800)); + let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| { + if i % 2 == 0 { + format!("string_{i}") + } else { + format!("A string larger than 12 bytes and thus not inlined {i}") + } + }))); + + let input_batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap(); + + let mut output = Vec::new(); + + let writer_options = WriterProperties::builder() + .set_max_row_group_size(200) + .set_data_page_row_count_limit(100) + .build(); + let mut writer = + ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap(); + + // since the limits are only enforced on batch boundaries, write the input + // batch in chunks of 50 + let mut row_remain = input_batch.num_rows(); + while row_remain > 0 { + let chunk_size = row_remain.min(50); + let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size); + writer.write(&chunk).unwrap(); + row_remain -= chunk_size; + } + writer.close().unwrap(); + Bytes::from(output) +}); + +/// A test parquet file and its layout. +struct TestParquetFile { + bytes: Bytes, + /// The operation log for IO operations performed on this file + ops: Arc<OperationLog>, + /// The (pre-parsed) parquet metadata for this file + parquet_metadata: Arc<ParquetMetaData>, +} + +impl TestParquetFile { + /// Create a new `TestParquetFile` with the specified temporary directory and path + /// and determines the row group layout. + fn new(bytes: Bytes) -> Self { + // Read the parquet file to determine its layout + let builder = ParquetRecordBatchReaderBuilder::try_new_with_options( + bytes.clone(), + ArrowReaderOptions::default().with_page_index(true), + ) + .unwrap(); + + let parquet_metadata = Arc::clone(builder.metadata()); + + let offset_index = parquet_metadata + .offset_index() + .expect("Parquet metadata should have a page index"); + + let row_groups = TestRowGroups::new(&parquet_metadata, offset_index); + + // figure out the footer location in the file + let footer_location = bytes.len() - FOOTER_SIZE..bytes.len(); + let footer = bytes.slice(footer_location.clone()); + let footer: &[u8; FOOTER_SIZE] = footer + .as_bytes() + .try_into() // convert to a fixed size array + .unwrap(); + + // figure out the metadata location + let footer = ParquetMetaDataReader::decode_footer_tail(footer).unwrap(); + let metadata_len = footer.metadata_length(); + let metadata_location = footer_location.start - metadata_len..footer_location.start; + + let ops = Arc::new(OperationLog::new( + footer_location, + metadata_location, + row_groups, + )); + + TestParquetFile { + bytes, + ops, + parquet_metadata, + } + } + + /// Return the internal bytes of the parquet file + fn bytes(&self) -> &Bytes { + &self.bytes + } + + /// Return the operation log for this file + fn ops(&self) -> &Arc<OperationLog> { + &self.ops + } + + /// Return the parquet metadata for this file + fn parquet_metadata(&self) -> &Arc<ParquetMetaData> { + &self.parquet_metadata + } +} + +/// Information about a column chunk +#[derive(Debug)] +struct TestColumnChunk { + /// The name of the column + name: String, + + /// The location of the entire column chunk in the file including data pages Review Comment: The comment contains a typo where 'data pages' appears twice. The first occurrence should likely be 'dictionary pages' to match the context of describing both dictionary and data pages. ```suggestion /// The location of the entire column chunk in the file including dictionary pages ``` ########## parquet/tests/arrow_reader/io/async_reader.rs: ########## @@ -0,0 +1,424 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for the async reader ([`ParquetRecordBatchStreamBuilder`]) + +use crate::io::{ + filter_a_175_b_625, filter_b_575_625, filter_b_false, test_file, test_options, LogEntry, + OperationLog, TestParquetFile, +}; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt}; +use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector}; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::errors::Result; +use parquet::file::metadata::ParquetMetaData; +use std::ops::Range; +use std::sync::Arc; + +#[tokio::test] +async fn test_read_entire_file() { + let test_file = test_file(); + // Expect to see IO for all data pages for each row group and column + let builder = async_builder(&test_file, test_options()).await; + run_test( + &test_file, + builder, + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'a': MultiPage(dictionary_page: true, : data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 0, column 'b': MultiPage(dictionary_page: true, : data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 0, column 'c': MultiPage(dictionary_page: true, : data_pages: [0, 1]) (7346 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'a': MultiPage(dictionary_page: true, : data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, : data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 1, column 'c': MultiPage(dictionary_page: true, : data_pages: [0, 1]) (7456 bytes, 1 requests) [data]", Review Comment: The output string contains a malformed format with an extra colon and space after 'dictionary_page: true,'. This appears to be the result of the format string bug in the Display implementation for PageType::Multi. ```suggestion " Row Group 0, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", " Row Group 0, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", " Row Group 0, column 'c': MultiPage(dictionary_page: true, data_pages: [0, 1]) (7346 bytes, 1 requests) [data]", "Read Multi:", " Row Group 1, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", " Row Group 1, column 'c': MultiPage(dictionary_page: true, data_pages: [0, 1]) (7456 bytes, 1 requests) [data]", ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org