This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 0b015991dd Parquet-concat: supports page index and bloom filter (#8811)
0b015991dd is described below

commit 0b015991dd665ac7cf64ad30aef6d709bc38900b
Author: mwish <[email protected]>
AuthorDate: Wed Nov 12 06:31:11 2025 +0800

    Parquet-concat: supports page index and bloom filter (#8811)
    
    # Which issue does this PR close?
    
    Supports page index and bloom filter in parquet-concat
    
    - Closes #8804 .
    
    # Rationale for this change
    
    Supports page index and bloom filter in parquet-concat
    
    # What changes are included in this PR?
    
    * Supports page index and bloom filter in parquet-concat
    * Expose a Sbbf read api
    
    # Are these changes tested?
    
    Test by commands
    
    # Are there any user-facing changes?
    
    Might change parquet-concat behavior
---
 parquet/src/bin/parquet-concat.rs | 33 ++++++++++++++++++++++++++-------
 parquet/src/bloom_filter/mod.rs   |  2 +-
 2 files changed, 27 insertions(+), 8 deletions(-)

diff --git a/parquet/src/bin/parquet-concat.rs 
b/parquet/src/bin/parquet-concat.rs
index e8ce4ca1db..a6f1aef781 100644
--- a/parquet/src/bin/parquet-concat.rs
+++ b/parquet/src/bin/parquet-concat.rs
@@ -37,10 +37,12 @@
 //!
 
 use clap::Parser;
+use parquet::bloom_filter::Sbbf;
 use parquet::column::writer::ColumnCloseResult;
 use parquet::errors::{ParquetError, Result};
-use parquet::file::metadata::ParquetMetaDataReader;
+use parquet::file::metadata::{ColumnChunkMetaData, PageIndexPolicy, 
ParquetMetaDataReader};
 use parquet::file::properties::WriterProperties;
+use parquet::file::reader::ChunkReader;
 use parquet::file::writer::SerializedFileWriter;
 use std::fs::File;
 use std::sync::Arc;
@@ -56,6 +58,10 @@ struct Args {
     input: Vec<String>,
 }
 
+fn read_bloom_filter<R: ChunkReader>(column: &ColumnChunkMetaData, input: &R) 
-> Option<Sbbf> {
+    Sbbf::read_from_column_chunk(column, input).ok().flatten()
+}
+
 impl Args {
     fn run(&self) -> Result<()> {
         if self.input.is_empty() {
@@ -71,7 +77,10 @@ impl Args {
             .iter()
             .map(|x| {
                 let reader = File::open(x)?;
-                let metadata = 
ParquetMetaDataReader::new().parse_and_finish(&reader)?;
+                // Enable reading page indexes if present
+                let metadata = ParquetMetaDataReader::new()
+                    .with_page_index_policy(PageIndexPolicy::Optional)
+                    .parse_and_finish(&reader)?;
                 Ok((reader, metadata))
             })
             .collect::<Result<Vec<_>>>()?;
@@ -91,16 +100,26 @@ impl Args {
         let mut writer = SerializedFileWriter::new(output, schema, props)?;
 
         for (input, metadata) in inputs {
-            for rg in metadata.row_groups() {
+            let column_indexes = metadata.column_index();
+            let offset_indexes = metadata.offset_index();
+
+            for (rg_idx, rg) in metadata.row_groups().iter().enumerate() {
+                let rg_column_indexes = column_indexes.and_then(|ci| 
ci.get(rg_idx));
+                let rg_offset_indexes = offset_indexes.and_then(|oi| 
oi.get(rg_idx));
                 let mut rg_out = writer.next_row_group()?;
-                for column in rg.columns() {
+                for (col_idx, column) in rg.columns().iter().enumerate() {
+                    let bloom_filter = read_bloom_filter(column, &input);
+                    let column_index = rg_column_indexes.and_then(|row| 
row.get(col_idx)).cloned();
+
+                    let offset_index = rg_offset_indexes.and_then(|row| 
row.get(col_idx)).cloned();
+
                     let result = ColumnCloseResult {
                         bytes_written: column.compressed_size() as _,
                         rows_written: rg.num_rows() as _,
                         metadata: column.clone(),
-                        bloom_filter: None,
-                        column_index: None,
-                        offset_index: None,
+                        bloom_filter,
+                        column_index,
+                        offset_index,
                     };
                     rg_out.append_column(&input, result)?;
                 }
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index 290a887b29..364928d366 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -333,7 +333,7 @@ impl Sbbf {
     }
 
     /// Read a new bloom filter from the given offset in the given reader.
-    pub(crate) fn read_from_column_chunk<R: ChunkReader>(
+    pub fn read_from_column_chunk<R: ChunkReader>(
         column_metadata: &ColumnChunkMetaData,
         reader: &R,
     ) -> Result<Option<Self>, ParquetError> {

Reply via email to