This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 77a4c432dd Fix bug in handling of empty Parquet page index structures
(#8817)
77a4c432dd is described below
commit 77a4c432dd3709f0810952e34b1a76876ae13aeb
Author: Ed Seidl <[email protected]>
AuthorDate: Thu Nov 13 08:55:06 2025 -0800
Fix bug in handling of empty Parquet page index structures (#8817)
# Which issue does this PR close?
- Closes #8815.
# Rationale for this change
When writing Parquet metadata, sometimes the column and offset indexes
contain missing values (this is usually a side effect of the
`ParquetMetaData` not allowing for `None` in the page index structures).
This can lead to errors or panics.
# What changes are included in this PR?
Adds some checking in `ThriftMetaDataWriter` to detect missing bits and
work around them.
# Are these changes tested?
Yes, new tests added.
# Are there any user-facing changes?
No
---
parquet/src/file/metadata/writer.rs | 169 +++++++++++++++++++++++++-----------
parquet/src/file/writer.rs | 71 +++++++++++++++
2 files changed, 187 insertions(+), 53 deletions(-)
diff --git a/parquet/src/file/metadata/writer.rs
b/parquet/src/file/metadata/writer.rs
index 5bb5f5cd85..38215f5ecd 100644
--- a/parquet/src/file/metadata/writer.rs
+++ b/parquet/src/file/metadata/writer.rs
@@ -114,38 +114,95 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
for (column_idx, column_metadata) in
row_group.columns.iter_mut().enumerate() {
if let Some(column_index) =
&column_indexes[row_group_idx][column_idx] {
let start_offset = self.buf.bytes_written();
- self.object_writer.write_column_index(
+ // only update column_metadata if the write succeeds
+ if self.object_writer.write_column_index(
column_index,
column_metadata,
row_group_idx,
column_idx,
&mut self.buf,
- )?;
- let end_offset = self.buf.bytes_written();
- // set offset and index for offset index
- column_metadata.column_index_offset = Some(start_offset as
i64);
- column_metadata.column_index_length = Some((end_offset -
start_offset) as i32);
+ )? {
+ let end_offset = self.buf.bytes_written();
+ // set offset and index for offset index
+ column_metadata.column_index_offset =
Some(start_offset as i64);
+ column_metadata.column_index_length =
+ Some((end_offset - start_offset) as i32);
+ }
}
}
}
Ok(())
}
- /// Assembles and writes the final metadata to self.buf
- pub fn finish(mut self) -> Result<ParquetMetaData> {
- let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
-
+ /// Serialize the column indexes and transform to
`Option<ParquetColumnIndex>`
+ fn finalize_column_indexes(&mut self) ->
Result<Option<ParquetColumnIndex>> {
let column_indexes = std::mem::take(&mut self.column_indexes);
- let offset_indexes = std::mem::take(&mut self.offset_indexes);
- // Write column indexes and offset indexes
+ // Write column indexes to file
if let Some(column_indexes) = column_indexes.as_ref() {
self.write_column_indexes(column_indexes)?;
}
+
+ // check to see if the index is `None` for every row group and column
chunk
+ let all_none = column_indexes
+ .as_ref()
+ .is_some_and(|ci| ci.iter().all(|cii| cii.iter().all(|idx|
idx.is_none())));
+
+ // transform from Option<Vec<Vec<Option<ColumnIndexMetaData>>>> to
+ // Option<Vec<Vec<ColumnIndexMetaData>>>
+ let column_indexes: Option<ParquetColumnIndex> = if all_none {
+ None
+ } else {
+ column_indexes.map(|ovvi| {
+ ovvi.into_iter()
+ .map(|vi| {
+ vi.into_iter()
+ .map(|ci| ci.unwrap_or(ColumnIndexMetaData::NONE))
+ .collect()
+ })
+ .collect()
+ })
+ };
+
+ Ok(column_indexes)
+ }
+
+ /// Serialize the offset indexes and transform to
`Option<ParquetOffsetIndex>`
+ fn finalize_offset_indexes(&mut self) ->
Result<Option<ParquetOffsetIndex>> {
+ let offset_indexes = std::mem::take(&mut self.offset_indexes);
+
+ // Write offset indexes to file
if let Some(offset_indexes) = offset_indexes.as_ref() {
self.write_offset_indexes(offset_indexes)?;
}
+ // check to see if the index is `None` for every row group and column
chunk
+ let all_none = offset_indexes
+ .as_ref()
+ .is_some_and(|oi| oi.iter().all(|oii| oii.iter().all(|idx|
idx.is_none())));
+
+ let offset_indexes: Option<ParquetOffsetIndex> = if all_none {
+ None
+ } else {
+ // FIXME(ets): this will panic if there's a missing index.
+ offset_indexes.map(|ovvi| {
+ ovvi.into_iter()
+ .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
+ .collect()
+ })
+ };
+
+ Ok(offset_indexes)
+ }
+
+ /// Assembles and writes the final metadata to self.buf
+ pub fn finish(mut self) -> Result<ParquetMetaData> {
+ let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
+
+ // serialize page indexes and transform to the proper form for use in
ParquetMetaData
+ let column_indexes = self.finalize_column_indexes()?;
+ let offset_indexes = self.finalize_offset_indexes()?;
+
// We only include ColumnOrder for leaf nodes.
// Currently only supported ColumnOrder is TypeDefinedOrder so we set
this
// for all leaf nodes.
@@ -220,34 +277,14 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
// unencrypted metadata before it is returned to users. This allows
the metadata
// to be usable for retrieving the row group statistics for example,
without users
// needing to decrypt the metadata.
- let mut builder = ParquetMetaDataBuilder::new(file_metadata);
-
- builder = match unencrypted_row_groups {
- Some(rg) => builder.set_row_groups(rg),
- None => builder.set_row_groups(row_groups),
- };
-
- let column_indexes: Option<ParquetColumnIndex> =
column_indexes.map(|ovvi| {
- ovvi.into_iter()
- .map(|vi| {
- vi.into_iter()
- .map(|oi| oi.unwrap_or(ColumnIndexMetaData::NONE))
- .collect()
- })
- .collect()
- });
+ let builder = ParquetMetaDataBuilder::new(file_metadata)
+ .set_column_index(column_indexes)
+ .set_offset_index(offset_indexes);
- // FIXME(ets): this will panic if there's a missing index.
- let offset_indexes: Option<ParquetOffsetIndex> =
offset_indexes.map(|ovvi| {
- ovvi.into_iter()
- .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
- .collect()
- });
-
- builder = builder.set_column_index(column_indexes);
- builder = builder.set_offset_index(offset_indexes);
-
- Ok(builder.build())
+ Ok(match unencrypted_row_groups {
+ Some(rg) => builder.set_row_groups(rg).build(),
+ None => builder.set_row_groups(row_groups).build(),
+ })
}
pub fn new(
@@ -495,11 +532,15 @@ impl MetadataObjectWriter {
#[cfg(not(feature = "encryption"))]
impl MetadataObjectWriter {
/// Write [`FileMetaData`] in Thrift format
+ ///
+ /// [`FileMetaData`]:
https://github.com/apache/parquet-format/tree/master?tab=readme-ov-file#metadata
fn write_file_metadata(&self, file_metadata: &FileMeta, sink: impl Write)
-> Result<()> {
Self::write_thrift_object(file_metadata, sink)
}
/// Write a column [`OffsetIndex`] in Thrift format
+ ///
+ /// [`OffsetIndex`]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
fn write_offset_index(
&self,
offset_index: &OffsetIndexMetaData,
@@ -512,6 +553,11 @@ impl MetadataObjectWriter {
}
/// Write a column [`ColumnIndex`] in Thrift format
+ ///
+ /// If `column_index` is [`ColumnIndexMetaData::NONE`] the index will not
be written and
+ /// this will return `false`. Returns `true` otherwise.
+ ///
+ /// [`ColumnIndex`]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
fn write_column_index(
&self,
column_index: &ColumnIndexMetaData,
@@ -519,8 +565,15 @@ impl MetadataObjectWriter {
_row_group_idx: usize,
_column_idx: usize,
sink: impl Write,
- ) -> Result<()> {
- Self::write_thrift_object(column_index, sink)
+ ) -> Result<bool> {
+ match column_index {
+ // Missing indexes may also have the placeholder
ColumnIndexMetaData::NONE
+ ColumnIndexMetaData::NONE => Ok(false),
+ _ => {
+ Self::write_thrift_object(column_index, sink)?;
+ Ok(true)
+ }
+ }
}
/// No-op implementation of row-group metadata encryption
@@ -598,6 +651,9 @@ impl MetadataObjectWriter {
/// Write a column [`ColumnIndex`] in Thrift format, possibly encrypting
it if required
///
+ /// If `column_index` is [`ColumnIndexMetaData::NONE`] the index will not
be written and
+ /// this will return `false`. Returns `true` otherwise.
+ ///
/// [`ColumnIndex`]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
fn write_column_index(
&self,
@@ -606,18 +662,25 @@ impl MetadataObjectWriter {
row_group_idx: usize,
column_idx: usize,
sink: impl Write,
- ) -> Result<()> {
- match &self.file_encryptor {
- Some(file_encryptor) => Self::write_thrift_object_with_encryption(
- column_index,
- sink,
- file_encryptor,
- column_chunk,
- ModuleType::ColumnIndex,
- row_group_idx,
- column_idx,
- ),
- None => Self::write_thrift_object(column_index, sink),
+ ) -> Result<bool> {
+ match column_index {
+ // Missing indexes may also have the placeholder
ColumnIndexMetaData::NONE
+ ColumnIndexMetaData::NONE => Ok(false),
+ _ => {
+ match &self.file_encryptor {
+ Some(file_encryptor) =>
Self::write_thrift_object_with_encryption(
+ column_index,
+ sink,
+ file_encryptor,
+ column_chunk,
+ ModuleType::ColumnIndex,
+ row_group_idx,
+ column_idx,
+ )?,
+ None => Self::write_thrift_object(column_index, sink)?,
+ }
+ Ok(true)
+ }
}
}
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index cbbcadf206..35948af022 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -1068,6 +1068,7 @@ mod tests {
use crate::schema::parser::parse_message_type;
use crate::schema::types;
use crate::schema::types::{ColumnDescriptor, ColumnPath};
+ use crate::util::test_common::file_util::get_test_file;
use crate::util::test_common::rand_gen::RandGen;
#[test]
@@ -2442,4 +2443,74 @@ mod tests {
start += 1;
}
}
+
+ #[test]
+ fn test_rewrite_no_page_indexes() {
+ let file = get_test_file("alltypes_tiny_pages.parquet");
+ let metadata = ParquetMetaDataReader::new()
+ .with_page_index_policy(PageIndexPolicy::Optional)
+ .parse_and_finish(&file)
+ .unwrap();
+
+ let props = Arc::new(WriterProperties::builder().build());
+ let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
+ let output = Vec::<u8>::new();
+ let mut writer = SerializedFileWriter::new(output, schema,
props).unwrap();
+
+ for rg in metadata.row_groups() {
+ let mut rg_out = writer.next_row_group().unwrap();
+ for column in rg.columns() {
+ let result = ColumnCloseResult {
+ bytes_written: column.compressed_size() as _,
+ rows_written: rg.num_rows() as _,
+ metadata: column.clone(),
+ bloom_filter: None,
+ column_index: None,
+ offset_index: None,
+ };
+ rg_out.append_column(&file, result).unwrap();
+ }
+ rg_out.close().unwrap();
+ }
+ writer.close().unwrap();
+ }
+
+ #[test]
+ fn test_rewrite_missing_column_index() {
+ // this file has an INT96 column that lacks a column index entry
+ let file = get_test_file("alltypes_tiny_pages.parquet");
+ let metadata = ParquetMetaDataReader::new()
+ .with_page_index_policy(PageIndexPolicy::Optional)
+ .parse_and_finish(&file)
+ .unwrap();
+
+ let props = Arc::new(WriterProperties::builder().build());
+ let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
+ let output = Vec::<u8>::new();
+ let mut writer = SerializedFileWriter::new(output, schema,
props).unwrap();
+
+ let column_indexes = metadata.column_index();
+ let offset_indexes = metadata.offset_index();
+
+ for (rg_idx, rg) in metadata.row_groups().iter().enumerate() {
+ let rg_column_indexes = column_indexes.and_then(|ci|
ci.get(rg_idx));
+ let rg_offset_indexes = offset_indexes.and_then(|oi|
oi.get(rg_idx));
+ let mut rg_out = writer.next_row_group().unwrap();
+ for (col_idx, column) in rg.columns().iter().enumerate() {
+ let column_index = rg_column_indexes.and_then(|row|
row.get(col_idx)).cloned();
+ let offset_index = rg_offset_indexes.and_then(|row|
row.get(col_idx)).cloned();
+ let result = ColumnCloseResult {
+ bytes_written: column.compressed_size() as _,
+ rows_written: rg.num_rows() as _,
+ metadata: column.clone(),
+ bloom_filter: None,
+ column_index,
+ offset_index,
+ };
+ rg_out.append_column(&file, result).unwrap();
+ }
+ rg_out.close().unwrap();
+ }
+ writer.close().unwrap();
+ }
}