This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new d7d847a9c0 [Parquet] Add tests for IO/CPU access in parquet reader 
(#7971)
d7d847a9c0 is described below

commit d7d847a9c0b034638c9303dde0af4a82b0a7b8ce
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Aug 15 09:46:26 2025 -0700

    [Parquet] Add tests for IO/CPU access in parquet reader (#7971)
    
    # Which issue does this PR close?
    
    - Part of https://github.com/apache/arrow-rs/issues/8000
    
    - Related to https://github.com/apache/arrow-rs/pull/7850
    
    # Rationale for this change
    
    There is quite a bit of code in the current Parquet sync and async
    readers related to IO patterns that I do not think is not covered by
    existing tests. As I refactor the guts of the readers into the
    PushDecoder, I would like to ensure we don't introduce regressions in
    existing functionality.
    
    I would like to add tests that cover the IO patterns of the Parquet
    Reader so I don't break it
    
    # What changes are included in this PR?
    
    Add tests which
    1. Creates a temporary parquet file with a known row group structure
    2. Reads data from that file using the Arrow Parquet Reader, recording
    the IO operations
    3. Asserts the expected IO patterns based on the read operations in a
    human understandable behavior
    
    This is done for both the sync and async readers.
    
    I am sorry this is such a massive PR, but it is entirely tests and I
    think it is quite important. I could break the sync or async tests into
    their own PR, but this seems uncessary
    
    # Are these changes tested?
    
    Yes, indeed the entire PR is only tests
    
    
    # Are there any user-facing changes?
---
 parquet/Cargo.toml                            |   1 +
 parquet/src/file/reader.rs                    |   9 +-
 parquet/tests/arrow_reader/io/async_reader.rs | 430 ++++++++++++++++
 parquet/tests/arrow_reader/io/mod.rs          | 703 ++++++++++++++++++++++++++
 parquet/tests/arrow_reader/io/sync_reader.rs  | 443 ++++++++++++++++
 parquet/tests/arrow_reader/mod.rs             |   7 +-
 6 files changed, 1586 insertions(+), 7 deletions(-)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 05557069aa..f601ac7cef 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -78,6 +78,7 @@ base64 = { version = "0.22", default-features = false, 
features = ["std"] }
 criterion = { version = "0.5", default-features = false, features = 
["async_futures"]  }
 snap = { version = "1.0", default-features = false }
 tempfile = { version = "3.0", default-features = false }
+insta = "1.43.1"
 brotli = { version = "8.0", default-features = false, features = ["std"] }
 flate2 = { version = "1.0", default-features = false, features = 
["rust_backend"] }
 lz4_flex = { version = "0.11", default-features = false, features = ["std", 
"frame"] }
diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs
index 7e2b149ad3..61af21a68e 100644
--- a/parquet/src/file/reader.rs
+++ b/parquet/src/file/reader.rs
@@ -48,11 +48,12 @@ pub trait Length {
 /// Generates [`Read`]ers to read chunks of a Parquet data source.
 ///
 /// The Parquet reader uses [`ChunkReader`] to access Parquet data, allowing
-/// multiple decoders to read concurrently from different locations in the 
same file.
+/// multiple decoders to read concurrently from different locations in the same
+/// file.
 ///
-/// The trait provides:
-/// * random access (via [`Self::get_bytes`])
-/// * sequential (via [`Self::get_read`])
+/// The trait functions both as a reader and a factory for readers.
+/// * random access via [`Self::get_bytes`]
+/// * sequential access via the reader returned via factory method 
[`Self::get_read`]
 ///
 /// # Provided Implementations
 /// * [`File`] for reading from local file system
diff --git a/parquet/tests/arrow_reader/io/async_reader.rs 
b/parquet/tests/arrow_reader/io/async_reader.rs
new file mode 100644
index 0000000000..f2d3ce0723
--- /dev/null
+++ b/parquet/tests/arrow_reader/io/async_reader.rs
@@ -0,0 +1,430 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for the async reader ([`ParquetRecordBatchStreamBuilder`])
+
+use crate::io::{
+    filter_a_175_b_625, filter_b_575_625, filter_b_false, test_file, 
test_options, LogEntry,
+    OperationLog, TestParquetFile,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, StreamExt};
+use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, 
RowSelector};
+use parquet::arrow::async_reader::AsyncFileReader;
+use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
+use parquet::errors::Result;
+use parquet::file::metadata::ParquetMetaData;
+use std::ops::Range;
+use std::sync::Arc;
+
+#[tokio::test]
+async fn test_read_entire_file() {
+    // read entire file without any filtering or projection
+    let test_file = test_file();
+    let builder = async_builder(&test_file, test_options()).await;
+    insta::assert_debug_snapshot!(run(
+        &test_file,
+        builder).await, @r#"
+    [
+        "Get Provided Metadata",
+        "Event: Builder Configured",
+        "Event: Reader Built",
+        "Read Multi:",
+        "  Row Group 0, column 'a': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+        "  Row Group 0, column 'b': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+        "  Row Group 0, column 'c': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (7346 bytes, 1 requests) [data]",
+        "Read Multi:",
+        "  Row Group 1, column 'a': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+        "  Row Group 1, column 'b': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+        "  Row Group 1, column 'c': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (7456 bytes, 1 requests) [data]",
+    ]
+    "#);
+}
+
+#[tokio::test]
+async fn test_read_single_group() {
+    let test_file = test_file();
+    let builder = async_builder(&test_file, test_options())
+        .await
+        // read only second row group
+        .with_row_groups(vec![1]);
+
+    // Expect to see only IO for Row Group 1. Should see no IO for Row Group 0.
+    insta::assert_debug_snapshot!(run(
+        &test_file,
+        builder).await, @r#"
+        [
+            "Get Provided Metadata",
+            "Event: Builder Configured",
+            "Event: Reader Built",
+            "Read Multi:",
+            "  Row Group 1, column 'a': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+            "  Row Group 1, column 'b': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+            "  Row Group 1, column 'c': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (7456 bytes, 1 requests) [data]",
+        ]
+    "#);
+}
+
+#[tokio::test]
+async fn test_read_single_column() {
+    let test_file = test_file();
+    let builder = async_builder(&test_file, test_options()).await;
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+    let builder = 
builder.with_projection(ProjectionMask::columns(&schema_descr, ["b"]));
+    // Expect to see only IO for column "b". Should see no IO for columns "a" 
or "c".
+    insta::assert_debug_snapshot!(run(
+        &test_file,
+        builder).await, @r#"
+        [
+            "Get Provided Metadata",
+            "Event: Builder Configured",
+            "Event: Reader Built",
+            "Read Multi:",
+            "  Row Group 0, column 'b': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+            "Read Multi:",
+            "  Row Group 1, column 'b': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+        ]
+    "#);
+}
+
+#[tokio::test]
+async fn test_read_row_selection() {
+    // There are 400 total rows spread across 4 data pages (100 rows each)
+    // select rows 175..225 (i.e. DataPage(1) of row group 0 and DataPage(0) 
of row group 1)
+    let test_file = test_file();
+    let builder = async_builder(&test_file, test_options()).await;
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+    let builder = builder
+        .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"]))
+        .with_row_selection(RowSelection::from(vec![
+            RowSelector::skip(175),
+            RowSelector::select(50),
+        ]));
+
+    // Expect to see only data IO for one page for each column for each row 
group
+    insta::assert_debug_snapshot!(run(
+        &test_file,
+        builder).await, @r#"
+    [
+        "Get Provided Metadata",
+        "Event: Builder Configured",
+        "Event: Reader Built",
+        "Read Multi:",
+        "  Row Group 0, column 'a': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "  Row Group 0, column 'a': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "  Row Group 0, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "  Row Group 0, column 'b': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Read Multi:",
+        "  Row Group 1, column 'a': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "  Row Group 1, column 'a': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "  Row Group 1, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "  Row Group 1, column 'b': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+    ]
+    "#);
+}
+
+#[tokio::test]
+async fn test_read_limit() {
+    // There are 400 total rows spread across 4 data pages (100 rows each)
+    // a limit of 125 rows should only fetch the first two data pages 
(DataPage(0) and DataPage(1)) from row group 0
+    let test_file = test_file();
+    let builder = async_builder(&test_file, test_options()).await;
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+    let builder = builder
+        .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
+        .with_limit(125);
+
+    insta::assert_debug_snapshot!(run(
+        &test_file,
+        builder).await, @r#"
+    [
+        "Get Provided Metadata",
+        "Event: Builder Configured",
+        "Event: Reader Built",
+        "Read Multi:",
+        "  Row Group 0, column 'a': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "  Row Group 0, column 'a': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "  Row Group 0, column 'a': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+    ]
+    "#);
+}
+
+#[tokio::test]
+async fn test_read_single_row_filter() {
+    // Values from column "b" range 400..799
+    // filter  "b" > 575 and < than 625
+    // (last data page in Row Group 0 and first DataPage in Row Group 1)
+    let test_file = test_file();
+    let builder = async_builder(&test_file, test_options()).await;
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+
+    let builder = builder
+        .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"]))
+        .with_row_filter(filter_b_575_625(&schema_descr));
+
+    // Expect to see I/O for column b in both row groups to evaluate filter,
+    // then a single pages for the "a" column in each row group
+    insta::assert_debug_snapshot!(run(
+        &test_file,
+        builder).await, @r#"
+        [
+            "Get Provided Metadata",
+            "Event: Builder Configured",
+            "Event: Reader Built",
+            "Read Multi:",
+            "  Row Group 0, column 'b': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+            "Read Multi:",
+            "  Row Group 0, column 'a': DictionaryPage   (1617 bytes, 1 
requests) [data]",
+            "  Row Group 0, column 'a': DataPage(1)      (126 bytes , 1 
requests) [data]",
+            "Read Multi:",
+            "  Row Group 1, column 'b': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+            "Read Multi:",
+            "  Row Group 1, column 'a': DictionaryPage   (1617 bytes, 1 
requests) [data]",
+            "  Row Group 1, column 'a': DataPage(0)      (113 bytes , 1 
requests) [data]",
+        ]
+    "#);
+}
+
+#[tokio::test]
+async fn test_read_single_row_filter_no_page_index() {
+    // Values from column "b" range 400..799
+    // Apply a filter  "b" > 575 and <less> than 625
+    // (last data page in Row Group 0 and first DataPage in Row Group 1)
+    let test_file = test_file();
+    let options = test_options().with_page_index(false);
+    let builder = async_builder(&test_file, options).await;
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+
+    let builder = builder
+        .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"]))
+        .with_row_filter(filter_b_575_625(&schema_descr));
+
+    // Since we don't have the page index, expect to see:
+    // 1. I/O for all pages of column b to evaluate the filter
+    // 2. IO for all pages of column a as the reader doesn't know where the 
page
+    //    boundaries are so needs to scan them.
+    insta::assert_debug_snapshot!(run(
+        &test_file,
+        builder).await, @r#"
+        [
+            "Get Provided Metadata",
+            "Event: Builder Configured",
+            "Event: Reader Built",
+            "Read Multi:",
+            "  Row Group 0, column 'b': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+            "Read Multi:",
+            "  Row Group 0, column 'a': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+            "Read Multi:",
+            "  Row Group 1, column 'b': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+            "Read Multi:",
+            "  Row Group 1, column 'a': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+        ]
+    "#);
+}
+
+#[tokio::test]
+async fn test_read_multiple_row_filter() {
+    // Values in column "a" range 0..399
+    // Values in column "b" range 400..799
+    // First filter: "a" > 175  (last data page in Row Group 0)
+    // Second filter: "b" < 625 (last data page in Row Group 0 and first 
DataPage in RowGroup 1)
+    // Read column "c"
+    let test_file = test_file();
+    let builder = async_builder(&test_file, test_options()).await;
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+
+    let builder = builder
+        .with_projection(ProjectionMask::columns(&schema_descr, ["c"]))
+        .with_row_filter(filter_a_175_b_625(&schema_descr));
+
+    // Expect that we will see
+    // 1. IO for all pages of column A (to evaluate the first filter)
+    // 2. IO for pages of column b that passed the first filter (to evaluate 
the second filter)
+    // 3. IO after reader is built only for column c for the rows that passed 
both filters
+    insta::assert_debug_snapshot!(run(
+        &test_file,
+        builder).await, @r#"
+        [
+            "Get Provided Metadata",
+            "Event: Builder Configured",
+            "Event: Reader Built",
+            "Read Multi:",
+            "  Row Group 0, column 'a': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+            "Read Multi:",
+            "  Row Group 0, column 'b': DictionaryPage   (1617 bytes, 1 
requests) [data]",
+            "  Row Group 0, column 'b': DataPage(1)      (126 bytes , 1 
requests) [data]",
+            "Read Multi:",
+            "  Row Group 0, column 'c': DictionaryPage   (7107 bytes, 1 
requests) [data]",
+            "  Row Group 0, column 'c': DataPage(1)      (126 bytes , 1 
requests) [data]",
+            "Read Multi:",
+            "  Row Group 1, column 'a': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+            "Read Multi:",
+            "  Row Group 1, column 'b': DictionaryPage   (1617 bytes, 1 
requests) [data]",
+            "  Row Group 1, column 'b': DataPage(0)      (113 bytes , 1 
requests) [data]",
+            "  Row Group 1, column 'b': DataPage(1)      (126 bytes , 1 
requests) [data]",
+            "Read Multi:",
+            "  Row Group 1, column 'c': DictionaryPage   (7217 bytes, 1 
requests) [data]",
+            "  Row Group 1, column 'c': DataPage(0)      (113 bytes , 1 
requests) [data]",
+        ]
+    "#);
+}
+
+#[tokio::test]
+async fn test_read_single_row_filter_all() {
+    // Apply a filter that filters out all rows
+
+    let test_file = test_file();
+    let builder = async_builder(&test_file, test_options()).await;
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+
+    let builder = builder
+        .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"]))
+        .with_row_filter(filter_b_false(&schema_descr));
+
+    // Expect to see reads for column "b" to evaluate the filter, but no reads
+    // for column "a" as no rows pass the filter
+    insta::assert_debug_snapshot!(run(
+        &test_file,
+        builder).await, @r#"
+        [
+            "Get Provided Metadata",
+            "Event: Builder Configured",
+            "Event: Reader Built",
+            "Read Multi:",
+            "  Row Group 0, column 'b': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+            "Read Multi:",
+            "  Row Group 1, column 'b': MultiPage(dictionary_page: true, 
data_pages: [0, 1])  (1856 bytes, 1 requests) [data]",
+        ]
+    "#);
+}
+
+/// Return a [`ParquetRecordBatchStreamBuilder`] for reading this file
+async fn async_builder(
+    test_file: &TestParquetFile,
+    options: ArrowReaderOptions,
+) -> ParquetRecordBatchStreamBuilder<RecordingAsyncFileReader> {
+    let parquet_meta_data = if options.page_index() {
+        Arc::clone(test_file.parquet_metadata())
+    } else {
+        // strip out the page index from the metadata
+        let metadata = test_file
+            .parquet_metadata()
+            .as_ref()
+            .clone()
+            .into_builder()
+            .set_column_index(None)
+            .set_offset_index(None)
+            .build();
+        Arc::new(metadata)
+    };
+
+    let reader = RecordingAsyncFileReader {
+        bytes: test_file.bytes().clone(),
+        ops: Arc::clone(test_file.ops()),
+        parquet_meta_data,
+    };
+
+    ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
+        .await
+        .unwrap()
+}
+
+/// Build the reader from the specified builder and read all batches from it,
+/// and return the operations log.
+async fn run(
+    test_file: &TestParquetFile,
+    builder: ParquetRecordBatchStreamBuilder<RecordingAsyncFileReader>,
+) -> Vec<String> {
+    let ops = test_file.ops();
+    ops.add_entry(LogEntry::event("Builder Configured"));
+    let mut stream = builder.build().unwrap();
+    ops.add_entry(LogEntry::event("Reader Built"));
+    while let Some(batch) = stream.next().await {
+        match batch {
+            Ok(_) => {}
+            Err(e) => panic!("Error reading batch: {e}"),
+        }
+    }
+    ops.snapshot()
+}
+
+struct RecordingAsyncFileReader {
+    bytes: Bytes,
+    ops: Arc<OperationLog>,
+    parquet_meta_data: Arc<ParquetMetaData>,
+}
+
+impl AsyncFileReader for RecordingAsyncFileReader {
+    fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, 
parquet::errors::Result<Bytes>> {
+        let ops = Arc::clone(&self.ops);
+        let data = self
+            .bytes
+            .slice(range.start as usize..range.end as usize)
+            .clone();
+
+        // translate to usize from u64
+        let logged_range = Range {
+            start: range.start as usize,
+            end: range.end as usize,
+        };
+        async move {
+            ops.add_entry_for_range(&logged_range);
+            Ok(data)
+        }
+        .boxed()
+    }
+
+    fn get_byte_ranges(&mut self, ranges: Vec<Range<u64>>) -> BoxFuture<'_, 
Result<Vec<Bytes>>> {
+        let ops = Arc::clone(&self.ops);
+        let datas = ranges
+            .iter()
+            .map(|range| {
+                self.bytes
+                    .slice(range.start as usize..range.end as usize)
+                    .clone()
+            })
+            .collect::<Vec<_>>();
+        // translate to usize from u64
+        let logged_ranges = ranges
+            .into_iter()
+            .map(|r| Range {
+                start: r.start as usize,
+                end: r.end as usize,
+            })
+            .collect::<Vec<_>>();
+
+        async move {
+            ops.add_entry_for_ranges(&logged_ranges);
+            Ok(datas)
+        }
+        .boxed()
+    }
+
+    fn get_metadata<'a>(
+        &'a mut self,
+        _options: Option<&'a ArrowReaderOptions>,
+    ) -> BoxFuture<'a, Result<Arc<ParquetMetaData>>> {
+        let ops = Arc::clone(&self.ops);
+        let parquet_meta_data = Arc::clone(&self.parquet_meta_data);
+        async move {
+            ops.add_entry(LogEntry::GetProvidedMetadata);
+            Ok(parquet_meta_data)
+        }
+        .boxed()
+    }
+}
diff --git a/parquet/tests/arrow_reader/io/mod.rs 
b/parquet/tests/arrow_reader/io/mod.rs
new file mode 100644
index 0000000000..b31f295755
--- /dev/null
+++ b/parquet/tests/arrow_reader/io/mod.rs
@@ -0,0 +1,703 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for IO read patterns in the Parquet Reader
+//!
+//! Each test:
+//! 1. Creates a temporary Parquet file with a known row group structure
+//! 2. Reads data from that file using the Arrow Parquet Reader, recording the 
IO operations
+//! 3. Asserts the expected IO patterns based on the read operations
+//!
+//! Note this module contains test infrastructure only. The actual tests are 
in the
+//! sub-modules [`sync_reader`] and [`async_reader`].
+//!
+//! Key components:
+//! - [`TestParquetFile`] - Represents a Parquet file and its layout
+//! - [`OperationLog`] - Records IO operations performed on the file
+//! - [`LogEntry`] - Represents a single IO operation in the log
+
+mod sync_reader;
+
+#[cfg(feature = "async")]
+mod async_reader;
+
+use arrow::compute::and;
+use arrow::compute::kernels::cmp::{gt, lt};
+use arrow_array::cast::AsArray;
+use arrow_array::types::Int64Type;
+use arrow_array::{ArrayRef, BooleanArray, Int64Array, RecordBatch, 
StringViewArray};
+use bytes::Bytes;
+use parquet::arrow::arrow_reader::{
+    ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, 
RowFilter,
+};
+use parquet::arrow::{ArrowWriter, ProjectionMask};
+use parquet::data_type::AsBytes;
+use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, 
ParquetOffsetIndex};
+use parquet::file::properties::WriterProperties;
+use parquet::file::FOOTER_SIZE;
+use parquet::format::PageLocation;
+use parquet::schema::types::SchemaDescriptor;
+use std::collections::BTreeMap;
+use std::fmt::Display;
+use std::ops::Range;
+use std::sync::{Arc, LazyLock, Mutex};
+
+/// Create a new `TestParquetFile` with:
+/// 3 columns: "a", "b", "c"
+///
+/// 2 row groups, each with 200 rows
+/// each data page has 100 rows
+///
+/// Values of column "a" are 0..399
+/// Values of column "b" are 400..799
+/// Values of column "c" are alternating strings of length 12 and longer
+fn test_file() -> TestParquetFile {
+    TestParquetFile::new(TEST_FILE_DATA.clone())
+}
+
+/// Default options for tests
+///
+/// Note these tests use the PageIndex to reduce IO
+fn test_options() -> ArrowReaderOptions {
+    ArrowReaderOptions::default().with_page_index(true)
+}
+
+/// Return a row filter that evaluates "b > 575" AND "b < 625"
+///
+/// last data page in Row Group 0 and first DataPage in Row Group 1
+fn filter_b_575_625(schema_descr: &SchemaDescriptor) -> RowFilter {
+    // "b" > 575 and "b" < 625
+    let predicate = ArrowPredicateFn::new(
+        ProjectionMask::columns(schema_descr, ["b"]),
+        |batch: RecordBatch| {
+            let scalar_575 = Int64Array::new_scalar(575);
+            let scalar_625 = Int64Array::new_scalar(625);
+            let column = batch.column(0).as_primitive::<Int64Type>();
+            and(&gt(column, &scalar_575)?, &lt(column, &scalar_625)?)
+        },
+    );
+    RowFilter::new(vec![Box::new(predicate)])
+}
+
+/// Filter a > 175 and b < 625
+/// First filter: "a" > 175  (last data page in Row Group 0)
+/// Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage 
in RowGroup 1)
+fn filter_a_175_b_625(schema_descr: &SchemaDescriptor) -> RowFilter {
+    // "a" > 175 and "b" < 625
+    let predicate_a = ArrowPredicateFn::new(
+        ProjectionMask::columns(schema_descr, ["a"]),
+        |batch: RecordBatch| {
+            let scalar_175 = Int64Array::new_scalar(175);
+            let column = batch.column(0).as_primitive::<Int64Type>();
+            gt(column, &scalar_175)
+        },
+    );
+
+    let predicate_b = ArrowPredicateFn::new(
+        ProjectionMask::columns(schema_descr, ["b"]),
+        |batch: RecordBatch| {
+            let scalar_625 = Int64Array::new_scalar(625);
+            let column = batch.column(0).as_primitive::<Int64Type>();
+            lt(column, &scalar_625)
+        },
+    );
+
+    RowFilter::new(vec![Box::new(predicate_a), Box::new(predicate_b)])
+}
+
+/// Filter FALSE (no rows) with b
+/// Entirely filters out both row groups
+/// Note it selects "b"
+fn filter_b_false(schema_descr: &SchemaDescriptor) -> RowFilter {
+    // "false"
+    let predicate = ArrowPredicateFn::new(
+        ProjectionMask::columns(schema_descr, ["b"]),
+        |batch: RecordBatch| {
+            let result =
+                BooleanArray::from_iter(std::iter::repeat_n(Some(false), 
batch.num_rows()));
+            Ok(result)
+        },
+    );
+    RowFilter::new(vec![Box::new(predicate)])
+}
+
+/// Create a parquet file in memory for testing. See [`test_file`] for details.
+static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
+    // Input batch has 400 rows, with 3 columns: "a", "b", "c"
+    // Note c is a different types (so the data page sizes will be different)
+    let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
+    let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
+    let c: ArrayRef = 
Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
+        if i % 2 == 0 {
+            format!("string_{i}")
+        } else {
+            format!("A string larger than 12 bytes and thus not inlined {i}")
+        }
+    })));
+
+    let input_batch = RecordBatch::try_from_iter(vec![("a", a), ("b", b), 
("c", c)]).unwrap();
+
+    let mut output = Vec::new();
+
+    let writer_options = WriterProperties::builder()
+        .set_max_row_group_size(200)
+        .set_data_page_row_count_limit(100)
+        .build();
+    let mut writer =
+        ArrowWriter::try_new(&mut output, input_batch.schema(), 
Some(writer_options)).unwrap();
+
+    // since the limits are only enforced on batch boundaries, write the input
+    // batch in chunks of 50
+    let mut row_remain = input_batch.num_rows();
+    while row_remain > 0 {
+        let chunk_size = row_remain.min(50);
+        let chunk = input_batch.slice(input_batch.num_rows() - row_remain, 
chunk_size);
+        writer.write(&chunk).unwrap();
+        row_remain -= chunk_size;
+    }
+    writer.close().unwrap();
+    Bytes::from(output)
+});
+
+/// A test parquet file and its layout.
+struct TestParquetFile {
+    bytes: Bytes,
+    /// The operation log for IO operations performed on this file
+    ops: Arc<OperationLog>,
+    /// The (pre-parsed) parquet metadata for this file
+    parquet_metadata: Arc<ParquetMetaData>,
+}
+
+impl TestParquetFile {
+    /// Create a new `TestParquetFile` with the specified temporary directory 
and path
+    /// and determines the row group layout.
+    fn new(bytes: Bytes) -> Self {
+        // Read the parquet file to determine its layout
+        let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(
+            bytes.clone(),
+            ArrowReaderOptions::default().with_page_index(true),
+        )
+        .unwrap();
+
+        let parquet_metadata = Arc::clone(builder.metadata());
+
+        let offset_index = parquet_metadata
+            .offset_index()
+            .expect("Parquet metadata should have a page index");
+
+        let row_groups = TestRowGroups::new(&parquet_metadata, offset_index);
+
+        // figure out the footer location in the file
+        let footer_location = bytes.len() - FOOTER_SIZE..bytes.len();
+        let footer = bytes.slice(footer_location.clone());
+        let footer: &[u8; FOOTER_SIZE] = footer
+            .as_bytes()
+            .try_into() // convert to a fixed size array
+            .unwrap();
+
+        // figure out the metadata location
+        let footer = 
ParquetMetaDataReader::decode_footer_tail(footer).unwrap();
+        let metadata_len = footer.metadata_length();
+        let metadata_location = footer_location.start - 
metadata_len..footer_location.start;
+
+        let ops = Arc::new(OperationLog::new(
+            footer_location,
+            metadata_location,
+            row_groups,
+        ));
+
+        TestParquetFile {
+            bytes,
+            ops,
+            parquet_metadata,
+        }
+    }
+
+    /// Return the internal bytes of the parquet file
+    fn bytes(&self) -> &Bytes {
+        &self.bytes
+    }
+
+    /// Return the operation log for this file
+    fn ops(&self) -> &Arc<OperationLog> {
+        &self.ops
+    }
+
+    /// Return the parquet metadata for this file
+    fn parquet_metadata(&self) -> &Arc<ParquetMetaData> {
+        &self.parquet_metadata
+    }
+}
+
+/// Information about a column chunk
+#[derive(Debug)]
+struct TestColumnChunk {
+    /// The name of the column
+    name: String,
+
+    /// The location of the entire column chunk in the file including 
dictionary pages
+    /// and data pages.
+    location: Range<usize>,
+
+    /// The offset of the start of of the dictionary page if any
+    dictionary_page_location: Option<i64>,
+
+    /// The location of the data pages in the file
+    page_locations: Vec<PageLocation>,
+}
+
+/// Information about the pages in a single row group
+#[derive(Debug)]
+struct TestRowGroup {
+    /// Maps column_name -> Information about the column chunk
+    columns: BTreeMap<String, TestColumnChunk>,
+}
+
+/// Information about all the row groups in a Parquet file, extracted from its 
metadata
+#[derive(Debug)]
+struct TestRowGroups {
+    /// List of row groups, each containing information about its columns and 
page locations
+    row_groups: Vec<TestRowGroup>,
+}
+
+impl TestRowGroups {
+    fn new(parquet_metadata: &ParquetMetaData, offset_index: 
&ParquetOffsetIndex) -> Self {
+        let row_groups = parquet_metadata
+            .row_groups()
+            .iter()
+            .enumerate()
+            .map(|(rg_index, rg_meta)| {
+                let columns = rg_meta
+                    .columns()
+                    .iter()
+                    .enumerate()
+                    .map(|(col_idx, col_meta)| {
+                        let column_name = 
col_meta.column_descr().name().to_string();
+                        let page_locations =
+                            
offset_index[rg_index][col_idx].page_locations().to_vec();
+                        let dictionary_page_location = 
col_meta.dictionary_page_offset();
+
+                        // We can find the byte range of the entire column 
chunk
+                        let (start_offset, length) = col_meta.byte_range();
+                        let start_offset = start_offset as usize;
+                        let end_offset = start_offset + length as usize;
+
+                        TestColumnChunk {
+                            name: column_name.clone(),
+                            location: start_offset..end_offset,
+                            dictionary_page_location,
+                            page_locations,
+                        }
+                    })
+                    .map(|test_column_chunk| {
+                        // make key=value pairs to insert into the BTreeMap
+                        (test_column_chunk.name.clone(), test_column_chunk)
+                    })
+                    .collect::<BTreeMap<_, _>>();
+                TestRowGroup { columns }
+            })
+            .collect();
+
+        Self { row_groups }
+    }
+
+    fn iter(&self) -> impl Iterator<Item = &TestRowGroup> {
+        self.row_groups.iter()
+    }
+}
+
+/// Type of data read
+#[derive(Debug, PartialEq)]
+enum PageType {
+    /// The data page with the specified index
+    Data {
+        data_page_index: usize,
+    },
+    Dictionary,
+    /// Multiple pages read together
+    Multi {
+        /// Was the dictionary page included?
+        dictionary_page: bool,
+        /// The data pages included
+        data_page_indices: Vec<usize>,
+    },
+}
+
+impl Display for PageType {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            PageType::Data { data_page_index } => {
+                write!(f, "DataPage({data_page_index})")
+            }
+            PageType::Dictionary => write!(f, "DictionaryPage"),
+            PageType::Multi {
+                dictionary_page,
+                data_page_indices,
+            } => {
+                let dictionary_page = if *dictionary_page {
+                    "dictionary_page: true, "
+                } else {
+                    ""
+                };
+                write!(
+                    f,
+                    "MultiPage({dictionary_page}data_pages: 
{data_page_indices:?})",
+                )
+            }
+        }
+    }
+}
+
+/// Read single logical data object (data page or dictionary page)
+/// in one or more requests
+#[derive(Debug)]
+struct ReadInfo {
+    row_group_index: usize,
+    column_name: String,
+    range: Range<usize>,
+    read_type: PageType,
+    /// Number of distinct requests (function calls) that were used
+    num_requests: usize,
+}
+
+impl Display for ReadInfo {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        let Self {
+            row_group_index,
+            column_name,
+            range,
+            read_type,
+            num_requests,
+        } = self;
+
+        // If the average read size is less than 10 bytes, assume it is the 
thrift
+        // decoder reading the page headers and add an annotation
+        let annotation = if (range.len() / num_requests) < 10 {
+            " [header]"
+        } else {
+            " [data]"
+        };
+
+        // align the read type to 20 characters for better readability, not 
sure why
+        // this does not work inline with write! macro below
+        write!(
+            f,
+            "Row Group {row_group_index}, column '{column_name}': {:15}  
({:10}, {:8}){annotation}",
+            // convert to strings so alignment works
+            format!("{read_type}"),
+            format!("{} bytes", range.len()),
+            format!("{num_requests} requests"),
+        )
+    }
+}
+
+/// Store structured entries in the log to make it easier to combine multiple 
entries
+#[derive(Debug)]
+enum LogEntry {
+    /// Read the footer (last 8 bytes) of the parquet file
+    ReadFooter(Range<usize>),
+    /// Read the metadata of the parquet file
+    ReadMetadata(Range<usize>),
+    /// Access previously parsed metadata
+    GetProvidedMetadata,
+    /// Read a single logical data object
+    ReadData(ReadInfo),
+    /// Read one or more logical data objects in a single operation
+    ReadMultipleData(Vec<LogEntry>),
+    /// Not known where the read came from
+    Unknown(Range<usize>),
+    /// A user defined event
+    Event(String),
+}
+
+impl LogEntry {
+    fn event(event: impl Into<String>) -> Self {
+        LogEntry::Event(event.into())
+    }
+
+    /// Appends a string representation of this log entry to the output vector
+    fn append_string(&self, output: &mut Vec<String>, indent: usize) {
+        let indent_str = " ".repeat(indent);
+        match self {
+            LogEntry::ReadFooter(range) => {
+                output.push(format!("{indent_str}Footer: {} bytes", 
range.len()))
+            }
+            LogEntry::ReadMetadata(range) => {
+                output.push(format!("{indent_str}Metadata: {}", range.len()))
+            }
+            LogEntry::GetProvidedMetadata => {
+                output.push(format!("{indent_str}Get Provided Metadata"))
+            }
+            LogEntry::ReadData(read_info) => 
output.push(format!("{indent_str}{read_info}")),
+            LogEntry::ReadMultipleData(read_infos) => {
+                output.push(format!("{indent_str}Read Multi:"));
+                for read_info in read_infos {
+                    let new_indent = indent + 2;
+                    read_info.append_string(output, new_indent);
+                }
+            }
+            LogEntry::Unknown(range) => {
+                output.push(format!("{indent_str}UNKNOWN: {range:?} (maybe 
Page Index)"))
+            }
+            LogEntry::Event(event) => output.push(format!("Event: {event}")),
+        }
+    }
+}
+
+#[derive(Debug)]
+struct OperationLog {
+    /// The operations performed on the file
+    ops: Mutex<Vec<LogEntry>>,
+
+    /// Footer location in the parquet file
+    footer_location: Range<usize>,
+
+    /// Metadata location in the parquet file
+    metadata_location: Range<usize>,
+
+    /// Information about the row group layout in the parquet file, used to
+    /// translate read operations into human understandable IO operations
+    /// Path to the parquet file
+    row_groups: TestRowGroups,
+}
+
+impl OperationLog {
+    fn new(
+        footer_location: Range<usize>,
+        metadata_location: Range<usize>,
+        row_groups: TestRowGroups,
+    ) -> Self {
+        OperationLog {
+            ops: Mutex::new(Vec::new()),
+            metadata_location,
+            footer_location,
+            row_groups,
+        }
+    }
+
+    /// Add an operation to the log
+    fn add_entry(&self, entry: LogEntry) {
+        let mut ops = self.ops.lock().unwrap();
+        ops.push(entry);
+    }
+
+    /// Adds an entry to the operation log for the interesting object that is
+    /// accessed by the specified range
+    ///
+    /// This function checks the ranges in order against possible locations
+    /// and adds the appropriate operation to the log for the first match 
found.
+    fn add_entry_for_range(&self, range: &Range<usize>) {
+        self.add_entry(self.entry_for_range(range));
+    }
+
+    /// Adds entries to the operation log for each interesting object that is
+    /// accessed by the specified range
+    ///
+    /// It behaves the same as [`add_entry_for_range`] but for multiple ranges.
+    fn add_entry_for_ranges<'a>(&self, ranges: impl IntoIterator<Item = &'a 
Range<usize>>) {
+        let entries = ranges
+            .into_iter()
+            .map(|range| self.entry_for_range(range))
+            .collect::<Vec<_>>();
+        self.add_entry(LogEntry::ReadMultipleData(entries));
+    }
+
+    /// Create an appropriate LogEntry for the specified range
+    fn entry_for_range(&self, range: &Range<usize>) -> LogEntry {
+        let start = range.start as i64;
+        let end = range.end as i64;
+
+        // figure out what logical part of the file this range corresponds to
+        if self.metadata_location.contains(&range.start)
+            || self.metadata_location.contains(&(range.end - 1))
+        {
+            return LogEntry::ReadMetadata(range.clone());
+        }
+
+        if self.footer_location.contains(&range.start)
+            || self.footer_location.contains(&(range.end - 1))
+        {
+            return LogEntry::ReadFooter(range.clone());
+        }
+
+        // Search for the location in each column chunk.
+        //
+        // The actual parquet reader must in general decode the page headers
+        // and determine the byte ranges of the pages. However, for this test
+        // we assume the following layout:
+        //
+        // ```text
+        // (Dictionary Page)
+        // (Data Page)
+        // ...
+        // (Data Page)
+        // ```
+        //
+        // We also assume that `self.page_locations` holds the location of all
+        // data pages, so any read operation that overlaps with a data page
+        // location is considered a read of that page, and any other read must
+        // be a dictionary page read.
+        for (row_group_index, row_group) in self.row_groups.iter().enumerate() 
{
+            for (column_name, test_column_chunk) in &row_group.columns {
+                // Check if the range overlaps with any data page locations
+                let page_locations = test_column_chunk.page_locations.iter();
+
+                // What data pages does this range overlap with?
+                let mut data_page_indices = vec![];
+
+                for (data_page_index, page_location) in 
page_locations.enumerate() {
+                    let page_offset = page_location.offset;
+                    let page_end = page_offset + 
page_location.compressed_page_size as i64;
+
+                    // if the range fully contains the page, consider it a 
read of that page
+                    if start >= page_offset && end <= page_end {
+                        let read_info = ReadInfo {
+                            row_group_index,
+                            column_name: column_name.clone(),
+                            range: range.clone(),
+                            read_type: PageType::Data { data_page_index },
+                            num_requests: 1,
+                        };
+                        return LogEntry::ReadData(read_info);
+                    }
+
+                    // if the range overlaps with the page, add it to the list 
of overlapping pages
+                    if start < page_end && end > page_offset {
+                        data_page_indices.push(data_page_index);
+                    }
+                }
+
+                // was the dictionary page read?
+                let mut dictionary_page = false;
+
+                // Check if the range overlaps with the dictionary page 
location
+                if let Some(dict_page_offset) = 
test_column_chunk.dictionary_page_location {
+                    let dict_page_end = dict_page_offset + 
test_column_chunk.location.len() as i64;
+                    if start >= dict_page_offset && end < dict_page_end {
+                        let read_info = ReadInfo {
+                            row_group_index,
+                            column_name: column_name.clone(),
+                            range: range.clone(),
+                            read_type: PageType::Dictionary,
+                            num_requests: 1,
+                        };
+
+                        return LogEntry::ReadData(read_info);
+                    }
+
+                    // if the range overlaps with the dictionary page, add it 
to the list of overlapping pages
+                    if start < dict_page_end && end > dict_page_offset {
+                        dictionary_page = true;
+                    }
+                }
+
+                // If we can't find a page, but the range overlaps with the
+                // column chunk location, use the column chunk location
+                let column_byte_range = &test_column_chunk.location;
+                if column_byte_range.contains(&range.start)
+                    && column_byte_range.contains(&(range.end - 1))
+                {
+                    let read_data_entry = ReadInfo {
+                        row_group_index,
+                        column_name: column_name.clone(),
+                        range: range.clone(),
+                        read_type: PageType::Multi {
+                            data_page_indices,
+                            dictionary_page,
+                        },
+                        num_requests: 1,
+                    };
+
+                    return LogEntry::ReadData(read_data_entry);
+                }
+            }
+        }
+
+        // If we reach here, the range does not match any known logical part 
of the file
+        LogEntry::Unknown(range.clone())
+    }
+
+    // Combine entries in the log that are similar to reduce noise in the log.
+    fn coalesce_entries(&self) {
+        let mut ops = self.ops.lock().unwrap();
+
+        // Coalesce entries with the same read type
+        let prev_ops = std::mem::take(&mut *ops);
+        for entry in prev_ops {
+            let Some(last) = ops.last_mut() else {
+                ops.push(entry);
+                continue;
+            };
+
+            let LogEntry::ReadData(ReadInfo {
+                row_group_index: last_rg_index,
+                column_name: last_column_name,
+                range: last_range,
+                read_type: last_read_type,
+                num_requests: last_num_reads,
+            }) = last
+            else {
+                // If the last entry is not a ReadColumnChunk, just push it
+                ops.push(entry);
+                continue;
+            };
+
+            // If the entry is not a ReadColumnChunk, just push it
+            let LogEntry::ReadData(ReadInfo {
+                row_group_index,
+                column_name,
+                range,
+                read_type,
+                num_requests: num_reads,
+            }) = &entry
+            else {
+                ops.push(entry);
+                continue;
+            };
+
+            // Combine the entries if they are the same and this read is less 
than 10b.
+            //
+            // This heuristic is used to combine small reads (typically 1-2
+            // byte) made by the thrift decoder when reading the 
data/dictionary
+            // page headers.
+            if *row_group_index != *last_rg_index
+                || column_name != last_column_name
+                || read_type != last_read_type
+                || (range.start > last_range.end)
+                || (range.end < last_range.start)
+                || range.len() > 10
+            {
+                ops.push(entry);
+                continue;
+            }
+            // combine
+            *last_range = 
last_range.start.min(range.start)..last_range.end.max(range.end);
+            *last_num_reads += num_reads;
+        }
+    }
+
+    /// return a snapshot of the current operations in the log.
+    fn snapshot(&self) -> Vec<String> {
+        self.coalesce_entries();
+        let ops = self.ops.lock().unwrap();
+        let mut actual = vec![];
+        let indent = 0;
+        ops.iter()
+            .for_each(|s| s.append_string(&mut actual, indent));
+        actual
+    }
+}
diff --git a/parquet/tests/arrow_reader/io/sync_reader.rs 
b/parquet/tests/arrow_reader/io/sync_reader.rs
new file mode 100644
index 0000000000..685f251a9e
--- /dev/null
+++ b/parquet/tests/arrow_reader/io/sync_reader.rs
@@ -0,0 +1,443 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for the sync reader - [`ParquetRecordBatchReaderBuilder`]
+
+use crate::io::{
+    filter_a_175_b_625, filter_b_575_625, filter_b_false, test_file, 
test_options, LogEntry,
+    OperationLog, TestParquetFile,
+};
+
+use bytes::Bytes;
+use parquet::arrow::arrow_reader::{
+    ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, 
RowSelector,
+};
+use parquet::arrow::ProjectionMask;
+use parquet::file::reader::{ChunkReader, Length};
+use std::io::Read;
+use std::sync::Arc;
+
+#[test]
+fn test_read_entire_file() {
+    // read entire file without any filtering or projection
+    let test_file = test_file();
+    // Expect to see IO for all data pages for each row group and column
+    let builder = sync_builder(&test_file, test_options());
+    insta::assert_debug_snapshot!(run(&test_file, builder),
+        @r#"
+    [
+        "Footer: 8 bytes",
+        "Metadata: 1162",
+        "UNKNOWN: 22230..22877 (maybe Page Index)",
+        "Event: Builder Configured",
+        "Event: Reader Built",
+        "Row Group 0, column 'a': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'a': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 0, column 'a': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'a': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'a': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'a': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 0, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'b': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 0, column 'b': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 0, column 'c': DictionaryPage   (7107 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'c': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 0, column 'c': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'c': DictionaryPage   (7217 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'c': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'c': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+    ]
+    "#);
+}
+
+#[test]
+fn test_read_single_group() {
+    let test_file = test_file();
+    let builder = sync_builder(&test_file, 
test_options()).with_row_groups(vec![1]); // read only second row group
+
+    // Expect to see only IO for Row Group 1. Should see no IO for Row Group 0.
+    insta::assert_debug_snapshot!(run(&test_file, builder),
+        @r#"
+    [
+        "Footer: 8 bytes",
+        "Metadata: 1162",
+        "UNKNOWN: 22230..22877 (maybe Page Index)",
+        "Event: Builder Configured",
+        "Event: Reader Built",
+        "Row Group 1, column 'a': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'a': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'a': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'c': DictionaryPage   (7217 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'c': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'c': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+    ]
+    "#);
+}
+
+#[test]
+fn test_read_single_column() {
+    let test_file = test_file();
+    let builder = sync_builder(&test_file, test_options());
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+    let builder = 
builder.with_projection(ProjectionMask::columns(&schema_descr, ["b"]));
+    // Expect to see only IO for column "b". Should see no IO for columns "a" 
or "c".
+    insta::assert_debug_snapshot!(run(&test_file, builder),
+        @r#"
+    [
+        "Footer: 8 bytes",
+        "Metadata: 1162",
+        "UNKNOWN: 22230..22877 (maybe Page Index)",
+        "Event: Builder Configured",
+        "Event: Reader Built",
+        "Row Group 0, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'b': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 0, column 'b': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+    ]
+    "#);
+}
+
+#[test]
+fn test_read_single_column_no_page_index() {
+    let test_file = test_file();
+    let options = test_options().with_page_index(false);
+    let builder = sync_builder(&test_file, options);
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+    let builder = 
builder.with_projection(ProjectionMask::columns(&schema_descr, ["b"]));
+    // Expect to see only IO for column "b", should see no IO for columns "a" 
or "c".
+    //
+    // Note that we need to read all data page headers to find the pages for 
column b
+    // so there are many more small reads than in the test_read_single_column 
test above
+    insta::assert_debug_snapshot!(run(&test_file, builder),
+        @r#"
+    [
+        "Footer: 8 bytes",
+        "Metadata: 1162",
+        "Event: Builder Configured",
+        "Event: Reader Built",
+        "Row Group 0, column 'b': DictionaryPage   (17 bytes  , 17 requests) 
[header]",
+        "Row Group 0, column 'b': DictionaryPage   (1600 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'b': DataPage(0)      (20 bytes  , 20 requests) 
[header]",
+        "Row Group 0, column 'b': DataPage(0)      (93 bytes  , 1 requests) 
[data]",
+        "Row Group 0, column 'b': DataPage(1)      (20 bytes  , 20 requests) 
[header]",
+        "Row Group 0, column 'b': DataPage(1)      (106 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DictionaryPage   (17 bytes  , 17 requests) 
[header]",
+        "Row Group 1, column 'b': DictionaryPage   (1600 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(0)      (20 bytes  , 20 requests) 
[header]",
+        "Row Group 1, column 'b': DataPage(0)      (93 bytes  , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(1)      (20 bytes  , 20 requests) 
[header]",
+        "Row Group 1, column 'b': DataPage(1)      (106 bytes , 1 requests) 
[data]",
+    ]
+    "#);
+}
+
+#[test]
+fn test_read_row_selection() {
+    // There are 400 total rows spread across 4 data pages (100 rows each)
+    // select rows 175..225 (i.e. DataPage(1) of row group 0 and DataPage(0) 
of row group 1)
+    let test_file = test_file();
+    let builder = sync_builder(&test_file, test_options());
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+    let builder = builder
+        .with_projection(
+            // read both "a" and "b"
+            ProjectionMask::columns(&schema_descr, ["a", "b"]),
+        )
+        .with_row_selection(RowSelection::from(vec![
+            RowSelector::skip(175),
+            RowSelector::select(50),
+        ]));
+
+    // Expect to see only data IO for one page for each column for each row 
group
+    // Note the data page headers for all pages need to be read to find the 
correct pages
+    insta::assert_debug_snapshot!(run(&test_file, builder),
+        @r#"
+    [
+        "Footer: 8 bytes",
+        "Metadata: 1162",
+        "UNKNOWN: 22230..22877 (maybe Page Index)",
+        "Event: Builder Configured",
+        "Event: Reader Built",
+        "Row Group 0, column 'a': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'a': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 0, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'b': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'a': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'a': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+    ]
+    "#);
+}
+
+#[test]
+fn test_read_limit() {
+    // There are 400 total rows spread across 4 data pages (100 rows each)
+    // a limit of 125 rows should only fetch the first two data pages 
(DataPage(0) and DataPage(1)) from row group 0
+    let test_file = test_file();
+    let builder = sync_builder(&test_file, test_options());
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+    let builder = builder
+        .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
+        .with_limit(125);
+
+    insta::assert_debug_snapshot!(run(&test_file, builder),
+        @r#"
+    [
+        "Footer: 8 bytes",
+        "Metadata: 1162",
+        "UNKNOWN: 22230..22877 (maybe Page Index)",
+        "Event: Builder Configured",
+        "Event: Reader Built",
+        "Row Group 0, column 'a': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'a': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 0, column 'a': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+    ]
+    "#);
+}
+
+#[test]
+fn test_read_single_row_filter() {
+    // Values from column "b" range 400..799
+    // filter  "b" > 575 and < 625
+    // (last data page in Row Group 0 and first DataPage in Row Group 1)
+    let test_file = test_file();
+    let builder = sync_builder(&test_file, test_options());
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+
+    let builder = builder
+        .with_projection(
+            // read both "a" and "b"
+            ProjectionMask::columns(&schema_descr, ["a", "b"]),
+        )
+        // "b" > 575 and "b" < 625
+        .with_row_filter(filter_b_575_625(&schema_descr));
+
+    // Expect to see I/O for column b in both row groups and then reading just 
a
+    // single pages for a in each row group
+    //
+    // Note there is significant IO that happens during the construction of the
+    // reader (between "Builder Configured" and "Reader Built")
+    insta::assert_debug_snapshot!(run(&test_file, builder),
+        @r#"
+    [
+        "Footer: 8 bytes",
+        "Metadata: 1162",
+        "UNKNOWN: 22230..22877 (maybe Page Index)",
+        "Event: Builder Configured",
+        "Row Group 0, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'b': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 0, column 'b': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Event: Reader Built",
+        "Row Group 0, column 'a': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'a': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 0, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'b': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'a': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'a': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+    ]
+    "#);
+}
+
+#[test]
+fn test_read_multiple_row_filter() {
+    // Values in column "a" range 0..399
+    // Values in column "b" range 400..799
+    // First filter: "a" > 175  (last data page in Row Group 0)
+    // Second filter: "b" < 625 (last data page in Row Group 0 and first 
DataPage in RowGroup 1)
+    // Read column "c"
+    let test_file = test_file();
+    let builder = sync_builder(&test_file, test_options());
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+
+    let builder = builder
+        .with_projection(
+            ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
+        )
+        // a > 175 and b < 625
+        .with_row_filter(filter_a_175_b_625(&schema_descr));
+
+    // Expect that we will see
+    // 1. IO for all pages of column A
+    // 2. IO for pages of column b that passed 1.
+    // 3. IO after reader is built only for column c
+    //
+    // Note there is significant IO that happens during the construction of the
+    // reader (between "Builder Configured" and "Reader Built")
+    insta::assert_debug_snapshot!(run(&test_file, builder),
+        @r#"
+    [
+        "Footer: 8 bytes",
+        "Metadata: 1162",
+        "UNKNOWN: 22230..22877 (maybe Page Index)",
+        "Event: Builder Configured",
+        "Row Group 0, column 'a': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'a': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 0, column 'a': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'a': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'a': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'a': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 0, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'b': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Event: Reader Built",
+        "Row Group 0, column 'c': DictionaryPage   (7107 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'c': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'c': DictionaryPage   (7217 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'c': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+    ]
+    "#);
+}
+
+#[test]
+fn test_read_single_row_filter_all() {
+    // Apply a filter that entirely filters out rows based on a predicate from 
one column
+    // should not read any data pages for any other column
+
+    let test_file = test_file();
+    let builder = sync_builder(&test_file, test_options());
+    let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
+
+    let builder = builder
+        .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"]))
+        .with_row_filter(filter_b_false(&schema_descr));
+
+    // Expect to see the Footer and Metadata, then I/O for column b
+    // in both row groups but then nothing for column "a"
+    // since the row filter entirely filters out all rows.
+    //
+    // Note that all IO that happens during the construction of the reader
+    // (between "Builder Configured" and "Reader Built")
+    insta::assert_debug_snapshot!(run(&test_file, builder),
+        @r#"
+    [
+        "Footer: 8 bytes",
+        "Metadata: 1162",
+        "UNKNOWN: 22230..22877 (maybe Page Index)",
+        "Event: Builder Configured",
+        "Row Group 0, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 0, column 'b': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 0, column 'b': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DictionaryPage   (1617 bytes, 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(0)      (113 bytes , 1 requests) 
[data]",
+        "Row Group 1, column 'b': DataPage(1)      (126 bytes , 1 requests) 
[data]",
+        "Event: Reader Built",
+    ]
+    "#);
+}
+
+/// Return a [`ParquetRecordBatchReaderBuilder`] for reading this file
+fn sync_builder(
+    test_file: &TestParquetFile,
+    options: ArrowReaderOptions,
+) -> ParquetRecordBatchReaderBuilder<RecordingChunkReader> {
+    let reader = RecordingChunkReader {
+        inner: test_file.bytes().clone(),
+        ops: Arc::clone(test_file.ops()),
+    };
+    ParquetRecordBatchReaderBuilder::try_new_with_options(reader, options)
+        .expect("ParquetRecordBatchReaderBuilder")
+}
+
+/// build the reader, and read all batches from it, returning the recorded IO 
operations
+fn run(
+    test_file: &TestParquetFile,
+    builder: ParquetRecordBatchReaderBuilder<RecordingChunkReader>,
+) -> Vec<String> {
+    let ops = test_file.ops();
+    ops.add_entry(LogEntry::event("Builder Configured"));
+    let reader = builder.build().unwrap();
+    ops.add_entry(LogEntry::event("Reader Built"));
+    for batch in reader {
+        match batch {
+            Ok(_) => {}
+            Err(e) => panic!("Error reading batch: {e}"),
+        }
+    }
+    ops.snapshot()
+}
+
+/// Records IO operations on an in-memory chunk reader
+struct RecordingChunkReader {
+    inner: Bytes,
+    ops: Arc<OperationLog>,
+}
+
+impl Length for RecordingChunkReader {
+    fn len(&self) -> u64 {
+        self.inner.len() as u64
+    }
+}
+
+impl ChunkReader for RecordingChunkReader {
+    type T = RecordingStdIoReader;
+
+    fn get_read(&self, start: u64) -> parquet::errors::Result<Self::T> {
+        let reader = RecordingStdIoReader {
+            start: start as usize,
+            inner: self.inner.clone(),
+            ops: Arc::clone(&self.ops),
+        };
+        Ok(reader)
+    }
+
+    fn get_bytes(&self, start: u64, length: usize) -> 
parquet::errors::Result<Bytes> {
+        let start = start as usize;
+        let range = start..start + length;
+        self.ops.add_entry_for_range(&range);
+        Ok(self.inner.slice(start..start + length))
+    }
+}
+
+/// Wrapper around a `Bytes` object that implements `Read`
+struct RecordingStdIoReader {
+    /// current offset in the inner `Bytes` that this reader is reading from
+    start: usize,
+    inner: Bytes,
+    ops: Arc<OperationLog>,
+}
+
+impl Read for RecordingStdIoReader {
+    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+        let remain = self.inner.len() - self.start;
+        let start = self.start;
+        let read_length = buf.len().min(remain);
+        let read_range = start..start + read_length;
+
+        self.ops.add_entry_for_range(&read_range);
+
+        buf.copy_from_slice(self.inner.slice(read_range).as_ref());
+        // Update the inner position
+        self.start += read_length;
+        Ok(read_length)
+    }
+}
diff --git a/parquet/tests/arrow_reader/mod.rs 
b/parquet/tests/arrow_reader/mod.rs
index 8d72d1def1..510d627860 100644
--- a/parquet/tests/arrow_reader/mod.rs
+++ b/parquet/tests/arrow_reader/mod.rs
@@ -42,6 +42,7 @@ mod bad_data;
 #[cfg(feature = "crc")]
 mod checksum;
 mod int96_stats_roundtrip;
+mod io;
 #[cfg(feature = "async")]
 mod predicate_cache;
 mod statistics;
@@ -336,9 +337,9 @@ fn make_uint_batches(start: u8, end: u8) -> RecordBatch {
         Field::new("u64", DataType::UInt64, true),
     ]));
     let v8: Vec<u8> = (start..end).collect();
-    let v16: Vec<u16> = (start as _..end as _).collect();
-    let v32: Vec<u32> = (start as _..end as _).collect();
-    let v64: Vec<u64> = (start as _..end as _).collect();
+    let v16: Vec<u16> = (start as _..end as u16).collect();
+    let v32: Vec<u32> = (start as _..end as u32).collect();
+    let v64: Vec<u64> = (start as _..end as u64).collect();
     RecordBatch::try_new(
         schema,
         vec![

Reply via email to