This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 77a4c432dd Fix bug in handling of empty Parquet page index structures 
(#8817)
77a4c432dd is described below

commit 77a4c432dd3709f0810952e34b1a76876ae13aeb
Author: Ed Seidl <[email protected]>
AuthorDate: Thu Nov 13 08:55:06 2025 -0800

    Fix bug in handling of empty Parquet page index structures (#8817)
    
    # Which issue does this PR close?
    
    - Closes #8815.
    
    # Rationale for this change
    
    When writing Parquet metadata, sometimes the column and offset indexes
    contain missing values (this is usually a side effect of the
    `ParquetMetaData` not allowing for `None` in the page index structures).
    This can lead to errors or panics.
    
    # What changes are included in this PR?
    
    Adds some checking in `ThriftMetaDataWriter` to detect missing bits and
    work around them.
    
    # Are these changes tested?
    
    Yes, new tests added.
    
    # Are there any user-facing changes?
    
    No
---
 parquet/src/file/metadata/writer.rs | 169 +++++++++++++++++++++++++-----------
 parquet/src/file/writer.rs          |  71 +++++++++++++++
 2 files changed, 187 insertions(+), 53 deletions(-)

diff --git a/parquet/src/file/metadata/writer.rs 
b/parquet/src/file/metadata/writer.rs
index 5bb5f5cd85..38215f5ecd 100644
--- a/parquet/src/file/metadata/writer.rs
+++ b/parquet/src/file/metadata/writer.rs
@@ -114,38 +114,95 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
             for (column_idx, column_metadata) in 
row_group.columns.iter_mut().enumerate() {
                 if let Some(column_index) = 
&column_indexes[row_group_idx][column_idx] {
                     let start_offset = self.buf.bytes_written();
-                    self.object_writer.write_column_index(
+                    // only update column_metadata if the write succeeds
+                    if self.object_writer.write_column_index(
                         column_index,
                         column_metadata,
                         row_group_idx,
                         column_idx,
                         &mut self.buf,
-                    )?;
-                    let end_offset = self.buf.bytes_written();
-                    // set offset and index for offset index
-                    column_metadata.column_index_offset = Some(start_offset as 
i64);
-                    column_metadata.column_index_length = Some((end_offset - 
start_offset) as i32);
+                    )? {
+                        let end_offset = self.buf.bytes_written();
+                        // set offset and index for offset index
+                        column_metadata.column_index_offset = 
Some(start_offset as i64);
+                        column_metadata.column_index_length =
+                            Some((end_offset - start_offset) as i32);
+                    }
                 }
             }
         }
         Ok(())
     }
 
-    /// Assembles and writes the final metadata to self.buf
-    pub fn finish(mut self) -> Result<ParquetMetaData> {
-        let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
-
+    /// Serialize the column indexes and transform to 
`Option<ParquetColumnIndex>`
+    fn finalize_column_indexes(&mut self) -> 
Result<Option<ParquetColumnIndex>> {
         let column_indexes = std::mem::take(&mut self.column_indexes);
-        let offset_indexes = std::mem::take(&mut self.offset_indexes);
 
-        // Write column indexes and offset indexes
+        // Write column indexes to file
         if let Some(column_indexes) = column_indexes.as_ref() {
             self.write_column_indexes(column_indexes)?;
         }
+
+        // check to see if the index is `None` for every row group and column 
chunk
+        let all_none = column_indexes
+            .as_ref()
+            .is_some_and(|ci| ci.iter().all(|cii| cii.iter().all(|idx| 
idx.is_none())));
+
+        // transform from Option<Vec<Vec<Option<ColumnIndexMetaData>>>> to
+        // Option<Vec<Vec<ColumnIndexMetaData>>>
+        let column_indexes: Option<ParquetColumnIndex> = if all_none {
+            None
+        } else {
+            column_indexes.map(|ovvi| {
+                ovvi.into_iter()
+                    .map(|vi| {
+                        vi.into_iter()
+                            .map(|ci| ci.unwrap_or(ColumnIndexMetaData::NONE))
+                            .collect()
+                    })
+                    .collect()
+            })
+        };
+
+        Ok(column_indexes)
+    }
+
+    /// Serialize the offset indexes and transform to 
`Option<ParquetOffsetIndex>`
+    fn finalize_offset_indexes(&mut self) -> 
Result<Option<ParquetOffsetIndex>> {
+        let offset_indexes = std::mem::take(&mut self.offset_indexes);
+
+        // Write offset indexes to file
         if let Some(offset_indexes) = offset_indexes.as_ref() {
             self.write_offset_indexes(offset_indexes)?;
         }
 
+        // check to see if the index is `None` for every row group and column 
chunk
+        let all_none = offset_indexes
+            .as_ref()
+            .is_some_and(|oi| oi.iter().all(|oii| oii.iter().all(|idx| 
idx.is_none())));
+
+        let offset_indexes: Option<ParquetOffsetIndex> = if all_none {
+            None
+        } else {
+            // FIXME(ets): this will panic if there's a missing index.
+            offset_indexes.map(|ovvi| {
+                ovvi.into_iter()
+                    .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
+                    .collect()
+            })
+        };
+
+        Ok(offset_indexes)
+    }
+
+    /// Assembles and writes the final metadata to self.buf
+    pub fn finish(mut self) -> Result<ParquetMetaData> {
+        let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
+
+        // serialize page indexes and transform to the proper form for use in 
ParquetMetaData
+        let column_indexes = self.finalize_column_indexes()?;
+        let offset_indexes = self.finalize_offset_indexes()?;
+
         // We only include ColumnOrder for leaf nodes.
         // Currently only supported ColumnOrder is TypeDefinedOrder so we set 
this
         // for all leaf nodes.
@@ -220,34 +277,14 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
         // unencrypted metadata before it is returned to users. This allows 
the metadata
         // to be usable for retrieving the row group statistics for example, 
without users
         // needing to decrypt the metadata.
-        let mut builder = ParquetMetaDataBuilder::new(file_metadata);
-
-        builder = match unencrypted_row_groups {
-            Some(rg) => builder.set_row_groups(rg),
-            None => builder.set_row_groups(row_groups),
-        };
-
-        let column_indexes: Option<ParquetColumnIndex> = 
column_indexes.map(|ovvi| {
-            ovvi.into_iter()
-                .map(|vi| {
-                    vi.into_iter()
-                        .map(|oi| oi.unwrap_or(ColumnIndexMetaData::NONE))
-                        .collect()
-                })
-                .collect()
-        });
+        let builder = ParquetMetaDataBuilder::new(file_metadata)
+            .set_column_index(column_indexes)
+            .set_offset_index(offset_indexes);
 
-        // FIXME(ets): this will panic if there's a missing index.
-        let offset_indexes: Option<ParquetOffsetIndex> = 
offset_indexes.map(|ovvi| {
-            ovvi.into_iter()
-                .map(|vi| vi.into_iter().map(|oi| oi.unwrap()).collect())
-                .collect()
-        });
-
-        builder = builder.set_column_index(column_indexes);
-        builder = builder.set_offset_index(offset_indexes);
-
-        Ok(builder.build())
+        Ok(match unencrypted_row_groups {
+            Some(rg) => builder.set_row_groups(rg).build(),
+            None => builder.set_row_groups(row_groups).build(),
+        })
     }
 
     pub fn new(
@@ -495,11 +532,15 @@ impl MetadataObjectWriter {
 #[cfg(not(feature = "encryption"))]
 impl MetadataObjectWriter {
     /// Write [`FileMetaData`] in Thrift format
+    ///
+    /// [`FileMetaData`]: 
https://github.com/apache/parquet-format/tree/master?tab=readme-ov-file#metadata
     fn write_file_metadata(&self, file_metadata: &FileMeta, sink: impl Write) 
-> Result<()> {
         Self::write_thrift_object(file_metadata, sink)
     }
 
     /// Write a column [`OffsetIndex`] in Thrift format
+    ///
+    /// [`OffsetIndex`]: 
https://github.com/apache/parquet-format/blob/master/PageIndex.md
     fn write_offset_index(
         &self,
         offset_index: &OffsetIndexMetaData,
@@ -512,6 +553,11 @@ impl MetadataObjectWriter {
     }
 
     /// Write a column [`ColumnIndex`] in Thrift format
+    ///
+    /// If `column_index` is [`ColumnIndexMetaData::NONE`] the index will not 
be written and
+    /// this will return `false`. Returns `true` otherwise.
+    ///
+    /// [`ColumnIndex`]: 
https://github.com/apache/parquet-format/blob/master/PageIndex.md
     fn write_column_index(
         &self,
         column_index: &ColumnIndexMetaData,
@@ -519,8 +565,15 @@ impl MetadataObjectWriter {
         _row_group_idx: usize,
         _column_idx: usize,
         sink: impl Write,
-    ) -> Result<()> {
-        Self::write_thrift_object(column_index, sink)
+    ) -> Result<bool> {
+        match column_index {
+            // Missing indexes may also have the placeholder 
ColumnIndexMetaData::NONE
+            ColumnIndexMetaData::NONE => Ok(false),
+            _ => {
+                Self::write_thrift_object(column_index, sink)?;
+                Ok(true)
+            }
+        }
     }
 
     /// No-op implementation of row-group metadata encryption
@@ -598,6 +651,9 @@ impl MetadataObjectWriter {
 
     /// Write a column [`ColumnIndex`] in Thrift format, possibly encrypting 
it if required
     ///
+    /// If `column_index` is [`ColumnIndexMetaData::NONE`] the index will not 
be written and
+    /// this will return `false`. Returns `true` otherwise.
+    ///
     /// [`ColumnIndex`]: 
https://github.com/apache/parquet-format/blob/master/PageIndex.md
     fn write_column_index(
         &self,
@@ -606,18 +662,25 @@ impl MetadataObjectWriter {
         row_group_idx: usize,
         column_idx: usize,
         sink: impl Write,
-    ) -> Result<()> {
-        match &self.file_encryptor {
-            Some(file_encryptor) => Self::write_thrift_object_with_encryption(
-                column_index,
-                sink,
-                file_encryptor,
-                column_chunk,
-                ModuleType::ColumnIndex,
-                row_group_idx,
-                column_idx,
-            ),
-            None => Self::write_thrift_object(column_index, sink),
+    ) -> Result<bool> {
+        match column_index {
+            // Missing indexes may also have the placeholder 
ColumnIndexMetaData::NONE
+            ColumnIndexMetaData::NONE => Ok(false),
+            _ => {
+                match &self.file_encryptor {
+                    Some(file_encryptor) => 
Self::write_thrift_object_with_encryption(
+                        column_index,
+                        sink,
+                        file_encryptor,
+                        column_chunk,
+                        ModuleType::ColumnIndex,
+                        row_group_idx,
+                        column_idx,
+                    )?,
+                    None => Self::write_thrift_object(column_index, sink)?,
+                }
+                Ok(true)
+            }
         }
     }
 
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index cbbcadf206..35948af022 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -1068,6 +1068,7 @@ mod tests {
     use crate::schema::parser::parse_message_type;
     use crate::schema::types;
     use crate::schema::types::{ColumnDescriptor, ColumnPath};
+    use crate::util::test_common::file_util::get_test_file;
     use crate::util::test_common::rand_gen::RandGen;
 
     #[test]
@@ -2442,4 +2443,74 @@ mod tests {
             start += 1;
         }
     }
+
+    #[test]
+    fn test_rewrite_no_page_indexes() {
+        let file = get_test_file("alltypes_tiny_pages.parquet");
+        let metadata = ParquetMetaDataReader::new()
+            .with_page_index_policy(PageIndexPolicy::Optional)
+            .parse_and_finish(&file)
+            .unwrap();
+
+        let props = Arc::new(WriterProperties::builder().build());
+        let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
+        let output = Vec::<u8>::new();
+        let mut writer = SerializedFileWriter::new(output, schema, 
props).unwrap();
+
+        for rg in metadata.row_groups() {
+            let mut rg_out = writer.next_row_group().unwrap();
+            for column in rg.columns() {
+                let result = ColumnCloseResult {
+                    bytes_written: column.compressed_size() as _,
+                    rows_written: rg.num_rows() as _,
+                    metadata: column.clone(),
+                    bloom_filter: None,
+                    column_index: None,
+                    offset_index: None,
+                };
+                rg_out.append_column(&file, result).unwrap();
+            }
+            rg_out.close().unwrap();
+        }
+        writer.close().unwrap();
+    }
+
+    #[test]
+    fn test_rewrite_missing_column_index() {
+        // this file has an INT96 column that lacks a column index entry
+        let file = get_test_file("alltypes_tiny_pages.parquet");
+        let metadata = ParquetMetaDataReader::new()
+            .with_page_index_policy(PageIndexPolicy::Optional)
+            .parse_and_finish(&file)
+            .unwrap();
+
+        let props = Arc::new(WriterProperties::builder().build());
+        let schema = metadata.file_metadata().schema_descr().root_schema_ptr();
+        let output = Vec::<u8>::new();
+        let mut writer = SerializedFileWriter::new(output, schema, 
props).unwrap();
+
+        let column_indexes = metadata.column_index();
+        let offset_indexes = metadata.offset_index();
+
+        for (rg_idx, rg) in metadata.row_groups().iter().enumerate() {
+            let rg_column_indexes = column_indexes.and_then(|ci| 
ci.get(rg_idx));
+            let rg_offset_indexes = offset_indexes.and_then(|oi| 
oi.get(rg_idx));
+            let mut rg_out = writer.next_row_group().unwrap();
+            for (col_idx, column) in rg.columns().iter().enumerate() {
+                let column_index = rg_column_indexes.and_then(|row| 
row.get(col_idx)).cloned();
+                let offset_index = rg_offset_indexes.and_then(|row| 
row.get(col_idx)).cloned();
+                let result = ColumnCloseResult {
+                    bytes_written: column.compressed_size() as _,
+                    rows_written: rg.num_rows() as _,
+                    metadata: column.clone(),
+                    bloom_filter: None,
+                    column_index,
+                    offset_index,
+                };
+                rg_out.append_column(&file, result).unwrap();
+            }
+            rg_out.close().unwrap();
+        }
+        writer.close().unwrap();
+    }
 }

Reply via email to