This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 0b015991dd Parquet-concat: supports page index and bloom filter (#8811)
0b015991dd is described below
commit 0b015991dd665ac7cf64ad30aef6d709bc38900b
Author: mwish <[email protected]>
AuthorDate: Wed Nov 12 06:31:11 2025 +0800
Parquet-concat: supports page index and bloom filter (#8811)
# Which issue does this PR close?
Supports page index and bloom filter in parquet-concat
- Closes #8804 .
# Rationale for this change
Supports page index and bloom filter in parquet-concat
# What changes are included in this PR?
* Supports page index and bloom filter in parquet-concat
* Expose a Sbbf read api
# Are these changes tested?
Test by commands
# Are there any user-facing changes?
Might change parquet-concat behavior
---
parquet/src/bin/parquet-concat.rs | 33 ++++++++++++++++++++++++++-------
parquet/src/bloom_filter/mod.rs | 2 +-
2 files changed, 27 insertions(+), 8 deletions(-)
diff --git a/parquet/src/bin/parquet-concat.rs
b/parquet/src/bin/parquet-concat.rs
index e8ce4ca1db..a6f1aef781 100644
--- a/parquet/src/bin/parquet-concat.rs
+++ b/parquet/src/bin/parquet-concat.rs
@@ -37,10 +37,12 @@
//!
use clap::Parser;
+use parquet::bloom_filter::Sbbf;
use parquet::column::writer::ColumnCloseResult;
use parquet::errors::{ParquetError, Result};
-use parquet::file::metadata::ParquetMetaDataReader;
+use parquet::file::metadata::{ColumnChunkMetaData, PageIndexPolicy,
ParquetMetaDataReader};
use parquet::file::properties::WriterProperties;
+use parquet::file::reader::ChunkReader;
use parquet::file::writer::SerializedFileWriter;
use std::fs::File;
use std::sync::Arc;
@@ -56,6 +58,10 @@ struct Args {
input: Vec<String>,
}
+fn read_bloom_filter<R: ChunkReader>(column: &ColumnChunkMetaData, input: &R)
-> Option<Sbbf> {
+ Sbbf::read_from_column_chunk(column, input).ok().flatten()
+}
+
impl Args {
fn run(&self) -> Result<()> {
if self.input.is_empty() {
@@ -71,7 +77,10 @@ impl Args {
.iter()
.map(|x| {
let reader = File::open(x)?;
- let metadata =
ParquetMetaDataReader::new().parse_and_finish(&reader)?;
+ // Enable reading page indexes if present
+ let metadata = ParquetMetaDataReader::new()
+ .with_page_index_policy(PageIndexPolicy::Optional)
+ .parse_and_finish(&reader)?;
Ok((reader, metadata))
})
.collect::<Result<Vec<_>>>()?;
@@ -91,16 +100,26 @@ impl Args {
let mut writer = SerializedFileWriter::new(output, schema, props)?;
for (input, metadata) in inputs {
- for rg in metadata.row_groups() {
+ let column_indexes = metadata.column_index();
+ let offset_indexes = metadata.offset_index();
+
+ for (rg_idx, rg) in metadata.row_groups().iter().enumerate() {
+ let rg_column_indexes = column_indexes.and_then(|ci|
ci.get(rg_idx));
+ let rg_offset_indexes = offset_indexes.and_then(|oi|
oi.get(rg_idx));
let mut rg_out = writer.next_row_group()?;
- for column in rg.columns() {
+ for (col_idx, column) in rg.columns().iter().enumerate() {
+ let bloom_filter = read_bloom_filter(column, &input);
+ let column_index = rg_column_indexes.and_then(|row|
row.get(col_idx)).cloned();
+
+ let offset_index = rg_offset_indexes.and_then(|row|
row.get(col_idx)).cloned();
+
let result = ColumnCloseResult {
bytes_written: column.compressed_size() as _,
rows_written: rg.num_rows() as _,
metadata: column.clone(),
- bloom_filter: None,
- column_index: None,
- offset_index: None,
+ bloom_filter,
+ column_index,
+ offset_index,
};
rg_out.append_column(&input, result)?;
}
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index 290a887b29..364928d366 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -333,7 +333,7 @@ impl Sbbf {
}
/// Read a new bloom filter from the given offset in the given reader.
- pub(crate) fn read_from_column_chunk<R: ChunkReader>(
+ pub fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
reader: &R,
) -> Result<Option<Self>, ParquetError> {