hhhizzz opened a new issue, #8565:
URL: https://github.com/apache/arrow-rs/issues/8565

   **Describe the bug**
   
   When a `RowFilter` is applied to a column that is **unsorted** and whose 
**page index provides little pruning**, the resulting `ReadPlan` becomes 
**highly fragmented**.
   It consists of a large number of tiny `select(N)` and `skip(M)` operations 
(often with `N = 1`), leading to severe performance degradation during decoding.
   
   The current “**filter-during-decode**” design processes data row by row and 
incurs heavy CPU overhead from branching and state management.
   In extreme cases, filtering an unsorted column can be **5× slower** than 
scanning the entire file — even though it touches far fewer rows.
   
   ---
   
   **To Reproduce**
   
   The following self-contained Rust program reproduces the issue.
   Run in **Release** mode.
   
   ```rust
   // [dependencies]
   // parquet = { version = "56.2.0", features = ["async"] }
   // tokio = { version = "1.47.1", features = ["full"] }
   // arrow-array = "56.2.0"
   // arrow-schema = "56.2.0"
   // anyhow = "1.0.100"
   // arrow = "56.2.0"
   
   use std::{
       fs::File,
       io::Write,
       path::{Path, PathBuf},
       sync::Arc,
       time::{Duration, Instant},
   };
   use anyhow::Result;
   use arrow::{
       array::{BooleanArray, BooleanBufferBuilder, Int32Array, RecordBatch},
       datatypes::{DataType, Field, Schema},
   };
   use arrow_array::builder::Int32Builder;
   use parquet::arrow::arrow_reader::{
       ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, 
RowFilter,
   };
   use parquet::arrow::{arrow_writer::ArrowWriter, ProjectionMask};
   use parquet::basic::Compression;
   use parquet::file::properties::WriterProperties;
   
   // ~400 MB target
   const TOTAL_ROWS: usize = 64_000_000;
   const ROWS_PER_ROW_GROUP: usize = 1_000_000;
   const OUTPUT_FILE: &str = "inventory.parquet";
   const IDX_QUANTITY: usize = 3;
   const LOWER: i32 = 100;
   const UPPER: i32 = 500;
   
   fn build_between_mask(col: &Int32Array, low: i32, high: i32) -> BooleanArray 
{
       let mut builder = BooleanBufferBuilder::new(col.len());
       for v in col.values() {
           builder.append(*v >= low && *v <= high);
       }
       BooleanArray::new(builder.finish(), None)
   }
   
   #[tokio::main]
   async fn main() -> Result<()> {
       let path = PathBuf::from(OUTPUT_FILE);
       tokio::task::spawn_blocking({
           let p = path.clone();
           move || generate(&p, TOTAL_ROWS)
       })
       .await??;
   
       let (rows_push, t_push) = tokio::task::spawn_blocking({
           let p = path.clone();
           move || read_with_pushdown(&p)
       })
       .await??;
   
       let (rows_no, t_no) = tokio::task::spawn_blocking({
           let p = path.clone();
           move || read_full_scan(&p)
       })
       .await??;
   
       println!(
           "\n--- Results ---\nWith Pushdown: {rows_push} rows in 
{t_push:.2?}\nWithout Pushdown: {rows_no} rows in {t_no:.2?}"
       );
   
       let ratio = t_no.as_secs_f64() / t_push.as_secs_f64();
       println!("⏱️ Speedup (no_pushdown / pushdown): {ratio:.2}x");
       Ok(())
   }
   
   fn generate(path: &Path, total: usize) -> Result<()> {
       let schema = Arc::new(Schema::new(vec![
           Field::new("inv_date_sk", DataType::Int32, false),
           Field::new("inv_item_sk", DataType::Int32, false),
           Field::new("inv_warehouse_sk", DataType::Int32, false),
           Field::new("inv_quantity_on_hand", DataType::Int32, false),
       ]));
   
       let file = File::create(path)?;
       let props = WriterProperties::builder()
           .set_compression(Compression::UNCOMPRESSED)
           .build();
       let mut writer = ArrowWriter::try_new(file, schema.clone(), 
Some(props))?;
   
       for rg in 0..(total + ROWS_PER_ROW_GROUP - 1) / ROWS_PER_ROW_GROUP {
           let start = rg * ROWS_PER_ROW_GROUP;
           let len = (ROWS_PER_ROW_GROUP).min(total - start);
           let mut cols = [
               Int32Builder::with_capacity(len),
               Int32Builder::with_capacity(len),
               Int32Builder::with_capacity(len),
               Int32Builder::with_capacity(len),
           ];
           for i in 0..len {
               let idx = (start + i) as i32;
               cols[0].append_value(2_450_000 + (idx % 365));
               cols[1].append_value(1 + (idx % 200_000));
               cols[2].append_value(1 + (idx % 1_000));
               cols[3].append_value(((idx.wrapping_mul(73).wrapping_add(900)) % 
1000).abs());
           }
           writer.write(&RecordBatch::try_new(
               schema.clone(),
               cols.into_iter().map(|b| Arc::new(b.finish())).collect(),
           )?)?;
           if (rg + 1) % 8 == 0 {
               print!(".");
               std::io::stdout().flush().ok();
           }
       }
       writer.close()?;
       Ok(())
   }
   
   fn read_with_pushdown(path: &Path) -> Result<(usize, Duration)> {
       let file = File::open(path)?;
       let options = ArrowReaderOptions::new().with_page_index(true);
       let builder = 
ParquetRecordBatchReaderBuilder::try_new_with_options(file, options)?;
       let mask = ProjectionMask::roots(builder.parquet_schema(), 
[IDX_QUANTITY]);
       let predicate = ArrowPredicateFn::new(mask, move |b: RecordBatch| {
           let col = b
               .column(0)
               .as_any()
               .downcast_ref::<Int32Array>()
               .unwrap();
           Ok(build_between_mask(col, LOWER, UPPER))
       });
       let reader = builder
           .with_row_filter(RowFilter::new(vec![Box::new(predicate)]))
           .with_projection(ProjectionMask::all())
           .build()?;
   
       let start = Instant::now();
       let mut rows = 0;
       for b in reader {
           rows += b?.num_rows();
       }
       Ok((rows, start.elapsed()))
   }
   
   fn read_full_scan(path: &Path) -> Result<(usize, Duration)> {
       let file = File::open(path)?;
       let builder =
           ParquetRecordBatchReaderBuilder::try_new_with_options(file, 
ArrowReaderOptions::new())?
               .with_projection(ProjectionMask::all())
               .build()?;
   
       let start = Instant::now();
       let mut count = 0;
       for b in builder {
           let batch = b?;
           let col = batch
               .column(IDX_QUANTITY)
               .as_any()
               .downcast_ref::<Int32Array>()
               .unwrap();
           count += col.values().iter().filter(|v| **v >= LOWER && **v <= 
UPPER).count();
       }
       Ok((count, start.elapsed()))
   }
   ```
   
   Example output on my MacBook:
   
   ```
   --- Results ---
   With Predicate Pushdown: 25664005 rows in 1.71s
   Without Pushdown       : 25664005 rows in 389.06ms
   ✅ Row counts match
   ⏱️ Speedup ≈ 0.23x
   ```
   
   By contrast, **Arrow C++** achieves a 1.35× speedup on the same dataset.
   ```
   --- Without Predicate Pushdown ---
   Found 25664005 rows.
   Time taken: 166.643 ms
   
   --- With Predicate Pushdown ---
   Found 25664005 rows.
   Time taken: 124.274 ms
   ```
   Sample code:
   ```cpp
   #include <iostream>
   #include <chrono>
   #include <memory>
   #include <utility> // Required for std::move
   
   // Arrowcore headers
   #include "arrow/api.h"
   
   // Arrow Dataset API, for predicate pushdown
   #include "arrow/dataset/api.h"
   #include "arrow/dataset/file_parquet.h"
   #include "arrow/dataset/scanner.h"
   
   // Parquet file reader
   #include "parquet/arrow/reader.h"
   
   // For building filter expressions
   #include "arrow/compute/api.h"
   #include "arrow/compute/expression.h"
   
   // Helper function to check Arrow operation status
   void StatusCheck(const arrow::Status& status) {
       if (!status.ok()) {
           std::cerr << "Arrow Error: " << status.ToString() << std::endl;
           exit(EXIT_FAILURE);
       }
   }
   
   // Method 1: Without Predicate Pushdown (This function is updated)
   long long CountWithoutPredicatePushdown(const std::string& file_path) {
       long long count = 0;
   
       auto infile_result = arrow::io::ReadableFile::Open(file_path);
       StatusCheck(infile_result.status());
       std::shared_ptr<arrow::io::ReadableFile> infile = *infile_result;
   
       // 1. Call OpenFile without the output pointer. It now returns a Result.
       auto reader_result = parquet::arrow::OpenFile(infile, 
arrow::default_memory_pool());
       StatusCheck(reader_result.status());
       // 2. Move the unique_ptr out of the Result object.
       std::unique_ptr<parquet::arrow::FileReader> reader = 
std::move(*reader_result);
   
       int col_index = 
reader->parquet_reader()->metadata()->schema()->ColumnIndex("inv_quantity_on_hand");
       if (col_index < 0) {
           std::cerr << "Column 'inv_quantity_on_hand' not found." << std::endl;
           return -1;
       }
   
       int num_row_groups = reader->num_row_groups();
       for (int i = 0; i < num_row_groups; ++i) {
           std::shared_ptr<arrow::Table> table;
           StatusCheck(reader->ReadRowGroup(i, {col_index}, &table));
   
           auto col_chunks = table->column(0);
   
           for (const auto& chunk : col_chunks->chunks()) {
               auto int_array = 
std::static_pointer_cast<arrow::Int32Array>(chunk);
               for (int64_t j = 0; j < int_array->length(); ++j) {
                   if (!int_array->IsNull(j)) {
                       int32_t value = int_array->Value(j);
                       if (value >= 100 && value <= 500) {
                           count++;
                       }
                   }
               }
           }
       }
       return count;
   }
   
   // Method 2: With Predicate Pushdown (No changes needed here)
   long long CountWithPredicatePushdown(const std::string& file_path) {
       long long count = 0;
   
       std::string uri = "file://" + file_path;
   
       arrow::dataset::FileSystemFactoryOptions options;
       auto factory_result = arrow::dataset::FileSystemDatasetFactory::Make(
           uri, std::make_shared<arrow::dataset::ParquetFileFormat>(), options);
       StatusCheck(factory_result.status());
       std::shared_ptr<arrow::dataset::DatasetFactory> factory = 
*factory_result;
   
       auto dataset_result = factory->Finish();
       StatusCheck(dataset_result.status());
       std::shared_ptr<arrow::dataset::Dataset> dataset = *dataset_result;
   
       arrow::compute::Expression filter_expr = arrow::compute::call("and",
           {
               
arrow::compute::greater_equal(arrow::compute::field_ref("inv_quantity_on_hand"),
 arrow::compute::literal(100)),
               
arrow::compute::less_equal(arrow::compute::field_ref("inv_quantity_on_hand"), 
arrow::compute::literal(500))
           }
       );
   
       auto scanner_builder_result = dataset->NewScan();
       StatusCheck(scanner_builder_result.status());
       std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder = 
*scanner_builder_result;
   
       StatusCheck(scanner_builder->Filter(filter_expr));
       StatusCheck(scanner_builder->Project({}));
   
       auto scanner_result = scanner_builder->Finish();
       StatusCheck(scanner_result.status());
       std::shared_ptr<arrow::dataset::Scanner> scanner = *scanner_result;
   
       auto reader_result = scanner->ToRecordBatchReader();
       StatusCheck(reader_result.status());
       std::shared_ptr<arrow::RecordBatchReader> reader = *reader_result;
   
       std::shared_ptr<arrow::RecordBatch> batch;
       while (true) {
           StatusCheck(reader->ReadNext(&batch));
           if (!batch) {
               break;
           }
           count += batch->num_rows();
       }
   
       return count;
   }
   
   int main() {
       arrow::compute::Initialize();
       std::string file_path = 
"/Users/xunixhuang/RustroverProjects/parquet-learn/inventory.parquet";
   
       // --- Test without predicate pushdown ---
       auto start1 = std::chrono::high_resolution_clock::now();
       long long count1 = CountWithoutPredicatePushdown(file_path);
       auto end1 = std::chrono::high_resolution_clock::now();
       std::chrono::duration<double, std::milli> duration1 = end1 - start1;
   
       std::cout << "--- Without Predicate Pushdown ---" << std::endl;
       std::cout << "Found " << count1 << " rows." << std::endl;
       std::cout << "Time taken: " << duration1.count() << " ms" << std::endl;
       std::cout << std::endl;
   
       // --- Test with predicate pushdown ---
       auto start2 = std::chrono::high_resolution_clock::now();
       long long count2 = CountWithPredicatePushdown(file_path);
       auto end2 = std::chrono::high_resolution_clock::now();
       std::chrono::duration<double, std::milli> duration2 = end2 - start2;
   
       std::cout << "--- With Predicate Pushdown ---" << std::endl;
       std::cout << "Found " << count2 << " rows." << std::endl;
       std::cout << "Time taken: " << duration2.count() << " ms" << std::endl;
       std::cout << std::endl;
   
       if (count1 == count2) {
           std::cout << "✅ Results match!" << std::endl;
       } else {
           std::cout << "❌ Error: Results do not match!" << std::endl;
       }
   
       return 0;
   }
   ```
   ---
   
   **Expected behavior**
   
   Predicate pushdown should **not degrade performance**.
   Even if minimal pruning occurs, the runtime should be comparable to or 
better than a full scan, similar to the Arrow C++ implementation.
   
   ---
   
   **Additional context**
   
   
   ---
   
   **Proposed Analysis & Solution**
   
   > 
https://github.com/apache/arrow-rs/blob/5993dffb714e47a720f810cc80b04176b591930a/parquet/src/arrow/arrow_reader/mod.rs#L1010
   
   The slowdown is caused by excessive per-row branching and state transitions 
in a fragmented `ReadPlan`.
   A possible **hybrid strategy** could mitigate this:
   
   1. **Analyze plan fragmentation** before decoding each page.
   2. If fragmentation is low, continue with the existing streaming 
“filter-during-decode” path.
   3. If fragmentation is high, **switch to “decode-then-filter”**: decode the 
full page into a temporary buffer and apply a vectorized filter kernel.
   
   This approach would preserve Arrow-RS’s low-memory design in the common case 
while avoiding CPU blowups in pathological cases.
   
   ---
   
   **Questions for maintainers**
   
   * Are there design constraints (e.g., streaming semantics, async I/O) that 
make this approach difficult to integrate?
   
   Thank you for maintaining this excellent library.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to