crepererum commented on code in PR #7971: URL: https://github.com/apache/arrow-rs/pull/7971#discussion_r2260576148
########## parquet/tests/arrow_reader/io/async_reader.rs: ########## @@ -0,0 +1,434 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests for the async reader ([`ParquetRecordBatchStreamBuilder`]) + +use crate::io::{ + filter_a_175_b_625, filter_b_575_625, filter_b_false, test_file, test_options, LogEntry, + OperationLog, TestParquetFile, +}; +use bytes::Bytes; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt}; +use parquet::arrow::arrow_reader::{ArrowReaderOptions, RowSelection, RowSelector}; +use parquet::arrow::async_reader::AsyncFileReader; +use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask}; +use parquet::errors::Result; +use parquet::file::metadata::ParquetMetaData; +use std::ops::Range; +use std::sync::Arc; + +#[tokio::test] +async fn test_read_entire_file() { + // read entire file without any filtering or projection + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + run_test( + &test_file, + builder, + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 0, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 0, column 'c': MultiPage(dictionary_page: true, data_pages: [0, 1]) (7346 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 1, column 'c': MultiPage(dictionary_page: true, data_pages: [0, 1]) (7456 bytes, 1 requests) [data]", + ] + ) + .await; +} + +#[tokio::test] +async fn test_read_single_group() { + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()) + .await + // read only second row group + .with_row_groups(vec![1]); + + // Expect to see only IO for Row Group 1. Should see no IO for Row Group 0. + run_test( + &test_file, + builder, + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 1, column 'a': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + " Row Group 1, column 'c': MultiPage(dictionary_page: true, data_pages: [0, 1]) (7456 bytes, 1 requests) [data]", + ] + ).await; +} + +#[tokio::test] +async fn test_read_single_column() { + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder.with_projection(ProjectionMask::columns(&schema_descr, ["b"])); + // Expect to see only IO for column "b". Should see no IO for columns "a" or "c". + run_test( + &test_file, + builder, + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'b': MultiPage(dictionary_page: true, data_pages: [0, 1]) (1856 bytes, 1 requests) [data]", + ] + ).await; +} + +#[tokio::test] +async fn test_read_row_selection() { + // There are 400 total rows spread across 4 data pages (100 rows each) + // select rows 175..225 (i.e. DataPage(1) of row group 0 and DataPage(0) of row group 1) + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a", "b"])) + .with_row_selection(RowSelection::from(vec![ + RowSelector::skip(175), + RowSelector::select(50), + ])); + + // Expect to see only data IO for one page for each column for each row group + run_test( + &test_file, + builder, + [ + "Get Provided Metadata", + "Event: Builder Configured", + "Event: Reader Built", + "Read Multi:", + " Row Group 0, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 0, column 'a': DataPage(1) (126 bytes , 1 requests) [data]", + " Row Group 0, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 0, column 'b': DataPage(1) (126 bytes , 1 requests) [data]", + "Read Multi:", + " Row Group 1, column 'a': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 1, column 'a': DataPage(0) (113 bytes , 1 requests) [data]", + " Row Group 1, column 'b': DictionaryPage (1617 bytes, 1 requests) [data]", + " Row Group 1, column 'b': DataPage(0) (113 bytes , 1 requests) [data]", + ], + ) + .await; +} + +#[tokio::test] +async fn test_read_limit() { + // There are 400 total rows spread across 4 data pages (100 rows each) + // a limit of 125 rows should only fetch the first two data pages (DataPage(0) and DataPage(1)) from row group 0 + let test_file = test_file(); + let builder = async_builder(&test_file, test_options()).await; + let schema_descr = builder.metadata().file_metadata().schema_descr_ptr(); + let builder = builder + .with_projection(ProjectionMask::columns(&schema_descr, ["a"])) + .with_limit(125); + + run_test( Review Comment: This looks a lot like a snapshot. These hardcoded strings will be a pain to update. Hence, could we use something like `insta` -- which is widely adopted by the Rust ecosystem incl. DataFusion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org