This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new d5ed6b9f8 Add `ThriftMetadataWriter` for writing Parquet metadata 
(#6197)
d5ed6b9f8 is described below

commit d5ed6b9f82482046b7744ea6d01e0f06d8a1c4f9
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Tue Aug 6 15:42:24 2024 -0500

    Add `ThriftMetadataWriter` for writing Parquet metadata (#6197)
    
    * bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` (#6041)
    
    * bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight`
    
    Signed-off-by: Bugen Zhao <[email protected]>
    
    * fix example tests
    
    Signed-off-by: Bugen Zhao <[email protected]>
    
    ---------
    
    Signed-off-by: Bugen Zhao <[email protected]>
    
    * Remove `impl<T: AsRef<[u8]>> From<T> for Buffer`  that easily 
accidentally copies data (#6043)
    
    * deprecate auto copy, ask explicit reference
    
    * update comments
    
    * make cargo doc happy
    
    * Make display of interval types more pretty (#6006)
    
    * improve dispaly for interval.
    
    * update test in pretty, and fix display problem.
    
    * tmp
    
    * fix tests in arrow-cast.
    
    * fix tests in pretty.
    
    * fix style.
    
    * Update snafu (#5930)
    
    * Update Parquet thrift generated structures (#6045)
    
    * update to latest thrift (as of 11 Jul 2024) from parquet-format
    
    * pass None for optional size statistics
    
    * escape HTML tags
    
    * don't need to escape brackets in arrays
    
    * Revert "Revert "Write Bloom filters between row groups instead of the end 
 (#…" (#5933)
    
    This reverts commit 22e0b4432c9838f2536284015271d3de9a165135.
    
    * Revert "Update snafu (#5930)" (#6069)
    
    This reverts commit 756b1fb26d1702f36f446faf9bb40a4869c3e840.
    
    * Update pyo3 requirement from 0.21.1 to 0.22.1 (fixed) (#6075)
    
    * Update pyo3 requirement from 0.21.1 to 0.22.1
    
    Updates the requirements on [pyo3](https://github.com/pyo3/pyo3) to permit 
the latest version.
    - [Release notes](https://github.com/pyo3/pyo3/releases)
    - [Changelog](https://github.com/PyO3/pyo3/blob/main/CHANGELOG.md)
    - [Commits](https://github.com/pyo3/pyo3/compare/v0.21.1...v0.22.1)
    
    ---
    updated-dependencies:
    - dependency-name: pyo3
      dependency-type: direct:production
    ...
    
    Signed-off-by: dependabot[bot] <[email protected]>
    
    * refactor: remove deprecated `FromPyArrow::from_pyarrow`
    
    "GIL Refs" are being phased out.
    
    * chore: update `pyo3` in integration tests
    
    ---------
    
    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
    
    * remove repeated codes to make the codes more concise. (#6080)
    
    * Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` (#6068)
    
    * update to latest thrift (as of 11 Jul 2024) from parquet-format
    
    * pass None for optional size statistics
    
    * escape HTML tags
    
    * don't need to escape brackets in arrays
    
    * add support for unencoded_byte_array_data_bytes
    
    * add comments
    
    * change sig of ColumnMetrics::update_variable_length_bytes()
    
    * rename ParquetOffsetIndex to OffsetSizeIndex
    
    * rename some functions
    
    * suggestion from review
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * add Default trait to ColumnMetrics as suggested in review
    
    * rename OffsetSizeIndex to OffsetIndexMetaData
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Update pyo3 requirement from 0.21.1 to 0.22.2 (#6085)
    
    Updates the requirements on [pyo3](https://github.com/pyo3/pyo3) to permit 
the latest version.
    - [Release notes](https://github.com/pyo3/pyo3/releases)
    - [Changelog](https://github.com/PyO3/pyo3/blob/v0.22.2/CHANGELOG.md)
    - [Commits](https://github.com/pyo3/pyo3/compare/v0.21.1...v0.22.2)
    
    ---
    updated-dependencies:
    - dependency-name: pyo3
      dependency-type: direct:production
    ...
    
    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
    
    * Deprecate read_page_locations() and simplify offset index in 
`ParquetMetaData` (#6095)
    
    * deprecate read_page_locations
    
    * add to_thrift() to OffsetIndexMetaData
    
    * Update parquet/src/column/writer/mod.rs
    
    Co-authored-by: Ed Seidl <[email protected]>
    
    * Upgrade protobuf definitions to flightsql 17.0 (#6133)
    
    * Update FlightSql.proto to version 17.0
    
    Adds new message CommandStatementIngest and removes `experimental` from
    other messages.
    
    * Regenerate flight sql protocol
    
    This upgrades the file to version 17.0 of the protobuf definition.
    
    * Add `ParquetMetadataWriter` allow ad-hoc encoding of `ParquetMetadata`
    
    * fix loading in test by etseidl
    
    Co-authored-by: Ed Seidl <[email protected]>
    
    * add rough equivalence test
    
    * one more check
    
    * make clippy happy
    
    * separate tests that require arrow into a separate module
    
    * add histograms to to_thrift()
    
    ---------
    
    Signed-off-by: Bugen Zhao <[email protected]>
    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: Bugen Zhao <[email protected]>
    Co-authored-by: Xiangpeng Hao <[email protected]>
    Co-authored-by: kamille <[email protected]>
    Co-authored-by: Jesse <[email protected]>
    Co-authored-by: Ed Seidl <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
    Co-authored-by: Marco Neumann <[email protected]>
    Co-authored-by: dependabot[bot] 
<49699333+dependabot[bot]@users.noreply.github.com>
    Co-authored-by: Douglas Anderson <[email protected]>
    Co-authored-by: Ed Seidl <[email protected]>
---
 parquet/src/file/page_index/index.rs |  47 +++
 parquet/src/file/writer.rs           | 624 +++++++++++++++++++++++++++++------
 parquet/src/thrift.rs                |   1 +
 3 files changed, 574 insertions(+), 98 deletions(-)

diff --git a/parquet/src/file/page_index/index.rs 
b/parquet/src/file/page_index/index.rs
index cebb602b3..0c23e4aa3 100644
--- a/parquet/src/file/page_index/index.rs
+++ b/parquet/src/file/page_index/index.rs
@@ -225,6 +225,53 @@ impl<T: ParquetValueType> NativeIndex<T> {
             boundary_order: index.boundary_order,
         })
     }
+
+    pub(crate) fn to_thrift(&self) -> ColumnIndex {
+        let min_values = self
+            .indexes
+            .iter()
+            .map(|x| x.min_bytes().map(|x| x.to_vec()))
+            .collect::<Option<Vec<_>>>()
+            .unwrap_or_else(|| vec![vec![]; self.indexes.len()]);
+
+        let max_values = self
+            .indexes
+            .iter()
+            .map(|x| x.max_bytes().map(|x| x.to_vec()))
+            .collect::<Option<Vec<_>>>()
+            .unwrap_or_else(|| vec![vec![]; self.indexes.len()]);
+
+        let null_counts = self
+            .indexes
+            .iter()
+            .map(|x| x.null_count())
+            .collect::<Option<Vec<_>>>();
+
+        // Concatenate page histograms into a single Option<Vec>
+        let repetition_level_histograms = self
+            .indexes
+            .iter()
+            .map(|x| x.repetition_level_histogram().map(|v| v.values()))
+            .collect::<Option<Vec<&[i64]>>>()
+            .map(|hists| hists.concat());
+
+        let definition_level_histograms = self
+            .indexes
+            .iter()
+            .map(|x| x.definition_level_histogram().map(|v| v.values()))
+            .collect::<Option<Vec<&[i64]>>>()
+            .map(|hists| hists.concat());
+
+        ColumnIndex::new(
+            self.indexes.iter().map(|x| x.min().is_none()).collect(),
+            min_values,
+            max_values,
+            self.boundary_order,
+            null_counts,
+            repetition_level_histograms,
+            definition_level_histograms,
+        )
+    }
 }
 
 #[cfg(test)]
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index cf383103d..c84d06a2c 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -19,6 +19,7 @@
 //! using row group writers and column writers respectively.
 
 use crate::bloom_filter::Sbbf;
+use crate::file::page_index::index::Index;
 use crate::format as parquet;
 use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
 use crate::thrift::TSerializable;
@@ -261,122 +262,41 @@ impl<W: Write + Send> SerializedFileWriter<W> {
         Ok(())
     }
 
-    /// Serialize all the offset index to the file
-    fn write_offset_indexes(&mut self, row_groups: &mut [RowGroup]) -> 
Result<()> {
-        // iter row group
-        // iter each column
-        // write offset index to the file
-        for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
-            for (column_idx, column_metadata) in 
row_group.columns.iter_mut().enumerate() {
-                match &self.offset_indexes[row_group_idx][column_idx] {
-                    Some(offset_index) => {
-                        let start_offset = self.buf.bytes_written();
-                        let mut protocol = TCompactOutputProtocol::new(&mut 
self.buf);
-                        offset_index.write_to_out_protocol(&mut protocol)?;
-                        let end_offset = self.buf.bytes_written();
-                        // set offset and index for offset index
-                        column_metadata.offset_index_offset = 
Some(start_offset as i64);
-                        column_metadata.offset_index_length =
-                            Some((end_offset - start_offset) as i32);
-                    }
-                    None => {}
-                }
-            }
-        }
-        Ok(())
-    }
-
-    /// Serialize all the column index to the file
-    fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> 
Result<()> {
-        // iter row group
-        // iter each column
-        // write column index to the file
-        for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
-            for (column_idx, column_metadata) in 
row_group.columns.iter_mut().enumerate() {
-                match &self.column_indexes[row_group_idx][column_idx] {
-                    Some(column_index) => {
-                        let start_offset = self.buf.bytes_written();
-                        let mut protocol = TCompactOutputProtocol::new(&mut 
self.buf);
-                        column_index.write_to_out_protocol(&mut protocol)?;
-                        let end_offset = self.buf.bytes_written();
-                        // set offset and index for offset index
-                        column_metadata.column_index_offset = 
Some(start_offset as i64);
-                        column_metadata.column_index_length =
-                            Some((end_offset - start_offset) as i32);
-                    }
-                    None => {}
-                }
-            }
-        }
-        Ok(())
-    }
-
     /// Assembles and writes metadata at the end of the file.
     fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
         self.finished = true;
-        let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();
 
         // write out any remaining bloom filters after all row groups
         for row_group in &mut self.row_groups {
             write_bloom_filters(&mut self.buf, &mut self.bloom_filters, 
row_group)?;
         }
 
-        let mut row_groups = self
-            .row_groups
-            .as_slice()
-            .iter()
-            .map(|v| v.to_thrift())
-            .collect::<Vec<_>>();
-
-        // Write column indexes and offset indexes
-        self.write_column_indexes(&mut row_groups)?;
-        self.write_offset_indexes(&mut row_groups)?;
-
         let key_value_metadata = match self.props.key_value_metadata() {
             Some(kv) => 
Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
             None if self.kv_metadatas.is_empty() => None,
             None => Some(self.kv_metadatas.clone()),
         };
 
-        // We only include ColumnOrder for leaf nodes.
-        // Currently only supported ColumnOrder is TypeDefinedOrder so we set 
this
-        // for all leaf nodes.
-        // Even if the column has an undefined sort order, such as INTERVAL, 
this
-        // is still technically the defined TYPEORDER so it should still be 
set.
-        let column_orders = (0..self.schema_descr().num_columns())
-            .map(|_| parquet::ColumnOrder::TYPEORDER(parquet::TypeDefinedOrder 
{}))
-            .collect();
-        // This field is optional, perhaps in cases where no min/max fields 
are set
-        // in any Statistics or ColumnIndex object in the whole file.
-        // But for simplicity we always set this field.
-        let column_orders = Some(column_orders);
+        let row_groups = self
+            .row_groups
+            .iter()
+            .map(|v| v.to_thrift())
+            .collect::<Vec<_>>();
 
-        let file_metadata = parquet::FileMetaData {
-            num_rows,
+        let mut encoder = ThriftMetadataWriter::new(
+            &mut self.buf,
+            &self.schema,
+            &self.descr,
             row_groups,
-            key_value_metadata,
-            version: self.props.writer_version().as_num(),
-            schema: types::to_thrift(self.schema.as_ref())?,
-            created_by: Some(self.props.created_by().to_owned()),
-            column_orders,
-            encryption_algorithm: None,
-            footer_signing_key_metadata: None,
-        };
-
-        // Write file metadata
-        let start_pos = self.buf.bytes_written();
-        {
-            let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
-            file_metadata.write_to_out_protocol(&mut protocol)?;
+            Some(self.props.created_by().to_string()),
+            self.props.writer_version().as_num(),
+        );
+        if let Some(key_value_metadata) = key_value_metadata {
+            encoder = encoder.with_key_value_metadata(key_value_metadata)
         }
-        let end_pos = self.buf.bytes_written();
-
-        // Write footer
-        let metadata_len = (end_pos - start_pos) as u32;
-
-        self.buf.write_all(&metadata_len.to_le_bytes())?;
-        self.buf.write_all(&PARQUET_MAGIC)?;
-        Ok(file_metadata)
+        encoder = encoder.with_column_indexes(&self.column_indexes);
+        encoder = encoder.with_offset_indexes(&self.offset_indexes);
+        encoder.finish()
     }
 
     #[inline]
@@ -810,6 +730,281 @@ impl<'a, W: Write + Send> PageWriter for 
SerializedPageWriter<'a, W> {
     }
 }
 
+/// Writes `crate::file::metadata` structures to a thrift encdoded byte streams
+///
+/// This structure handles the details of writing the various parts of parquet
+/// metadata into a byte stream. It is used to write the metadata into a
+/// parquet file and can also write metadata into other locations (such as a
+/// store of bytes).
+///
+/// This is somewhat trickey because the metadata is not store as a single 
inline
+/// thrift struture. It can have several "out of band" structures such as the 
OffsetIndex
+/// and BloomFilters which are stored separately whose locations are stored as 
offsets
+struct ThriftMetadataWriter<'a, W: Write> {
+    buf: &'a mut TrackedWrite<W>,
+    schema: &'a TypePtr,
+    schema_descr: &'a SchemaDescPtr,
+    row_groups: Vec<RowGroup>,
+    column_indexes: Option<&'a [Vec<Option<ColumnIndex>>]>,
+    offset_indexes: Option<&'a [Vec<Option<OffsetIndex>>]>,
+    key_value_metadata: Option<Vec<KeyValue>>,
+    created_by: Option<String>,
+    writer_version: i32,
+}
+
+impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
+    /// Serialize all the offset index to the file
+    fn write_offset_indexes(&mut self, offset_indexes: 
&[Vec<Option<OffsetIndex>>]) -> Result<()> {
+        // iter row group
+        // iter each column
+        // write offset index to the file
+        for (row_group_idx, row_group) in 
self.row_groups.iter_mut().enumerate() {
+            for (column_idx, column_metadata) in 
row_group.columns.iter_mut().enumerate() {
+                match &offset_indexes[row_group_idx][column_idx] {
+                    Some(offset_index) => {
+                        let start_offset = self.buf.bytes_written();
+                        let mut protocol = TCompactOutputProtocol::new(&mut 
self.buf);
+                        offset_index.write_to_out_protocol(&mut protocol)?;
+                        let end_offset = self.buf.bytes_written();
+                        // set offset and index for offset index
+                        column_metadata.offset_index_offset = 
Some(start_offset as i64);
+                        column_metadata.offset_index_length =
+                            Some((end_offset - start_offset) as i32);
+                    }
+                    None => {}
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Serialize all the column index to the file
+    fn write_column_indexes(&mut self, column_indexes: 
&[Vec<Option<ColumnIndex>>]) -> Result<()> {
+        // iter row group
+        // iter each column
+        // write column index to the file
+        for (row_group_idx, row_group) in 
self.row_groups.iter_mut().enumerate() {
+            for (column_idx, column_metadata) in 
row_group.columns.iter_mut().enumerate() {
+                match &column_indexes[row_group_idx][column_idx] {
+                    Some(column_index) => {
+                        let start_offset = self.buf.bytes_written();
+                        let mut protocol = TCompactOutputProtocol::new(&mut 
self.buf);
+                        column_index.write_to_out_protocol(&mut protocol)?;
+                        let end_offset = self.buf.bytes_written();
+                        // set offset and index for offset index
+                        column_metadata.column_index_offset = 
Some(start_offset as i64);
+                        column_metadata.column_index_length =
+                            Some((end_offset - start_offset) as i32);
+                    }
+                    None => {}
+                }
+            }
+        }
+        Ok(())
+    }
+
+    /// Assembles and writes the final metadata to self.buf
+    pub fn finish(mut self) -> Result<parquet::FileMetaData> {
+        let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
+
+        // Write column indexes and offset indexes
+        if let Some(column_indexes) = self.column_indexes {
+            self.write_column_indexes(column_indexes)?;
+        }
+        if let Some(offset_indexes) = self.offset_indexes {
+            self.write_offset_indexes(offset_indexes)?;
+        }
+
+        // We only include ColumnOrder for leaf nodes.
+        // Currently only supported ColumnOrder is TypeDefinedOrder so we set 
this
+        // for all leaf nodes.
+        // Even if the column has an undefined sort order, such as INTERVAL, 
this
+        // is still technically the defined TYPEORDER so it should still be 
set.
+        let column_orders = (0..self.schema_descr.num_columns())
+            .map(|_| parquet::ColumnOrder::TYPEORDER(parquet::TypeDefinedOrder 
{}))
+            .collect();
+        // This field is optional, perhaps in cases where no min/max fields 
are set
+        // in any Statistics or ColumnIndex object in the whole file.
+        // But for simplicity we always set this field.
+        let column_orders = Some(column_orders);
+
+        let file_metadata = parquet::FileMetaData {
+            num_rows,
+            row_groups: self.row_groups,
+            key_value_metadata: self.key_value_metadata.clone(),
+            version: self.writer_version,
+            schema: types::to_thrift(self.schema.as_ref())?,
+            created_by: self.created_by.clone(),
+            column_orders,
+            encryption_algorithm: None,
+            footer_signing_key_metadata: None,
+        };
+
+        // Write file metadata
+        let start_pos = self.buf.bytes_written();
+        {
+            let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
+            file_metadata.write_to_out_protocol(&mut protocol)?;
+        }
+        let end_pos = self.buf.bytes_written();
+
+        // Write footer
+        let metadata_len = (end_pos - start_pos) as u32;
+
+        self.buf.write_all(&metadata_len.to_le_bytes())?;
+        self.buf.write_all(&PARQUET_MAGIC)?;
+        Ok(file_metadata)
+    }
+
+    pub(self) fn new(
+        buf: &'a mut TrackedWrite<W>,
+        schema: &'a TypePtr,
+        schema_descr: &'a SchemaDescPtr,
+        row_groups: Vec<RowGroup>,
+        created_by: Option<String>,
+        writer_version: i32,
+    ) -> Self {
+        Self {
+            buf,
+            schema,
+            schema_descr,
+            row_groups,
+            column_indexes: None,
+            offset_indexes: None,
+            key_value_metadata: None,
+            created_by,
+            writer_version,
+        }
+    }
+
+    pub fn with_column_indexes(mut self, column_indexes: &'a 
[Vec<Option<ColumnIndex>>]) -> Self {
+        self.column_indexes = Some(column_indexes);
+        self
+    }
+
+    pub fn with_offset_indexes(mut self, offset_indexes: &'a 
[Vec<Option<OffsetIndex>>]) -> Self {
+        self.offset_indexes = Some(offset_indexes);
+        self
+    }
+
+    pub fn with_key_value_metadata(mut self, key_value_metadata: 
Vec<KeyValue>) -> Self {
+        self.key_value_metadata = Some(key_value_metadata);
+        self
+    }
+}
+
+pub struct ParquetMetadataWriter<'a, W: Write> {
+    buf: TrackedWrite<W>,
+    write_page_index: bool,
+    metadata: &'a ParquetMetaData,
+}
+
+impl<'a, W: Write> ParquetMetadataWriter<'a, W> {
+    pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
+        Self {
+            buf: TrackedWrite::new(buf),
+            write_page_index: true,
+            metadata,
+        }
+    }
+
+    pub fn write_page_index(&mut self, write_page_index: bool) -> &mut Self {
+        self.write_page_index = write_page_index;
+        self
+    }
+
+    pub fn finish(&mut self) -> Result<()> {
+        let file_metadata = self.metadata.file_metadata();
+
+        let schema = Arc::new(file_metadata.schema().clone());
+        let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
+        let created_by = file_metadata.created_by().map(str::to_string);
+
+        let row_groups = self
+            .metadata
+            .row_groups()
+            .iter()
+            .map(|rg| rg.to_thrift())
+            .collect::<Vec<_>>();
+
+        let key_value_metadata = file_metadata.key_value_metadata().cloned();
+
+        let column_indexes = self.convert_column_indexes();
+        let offset_indexes = self.convert_offset_index();
+
+        let mut encoder = ThriftMetadataWriter::new(
+            &mut self.buf,
+            &schema,
+            &schema_descr,
+            row_groups,
+            created_by,
+            file_metadata.version(),
+        );
+        encoder = encoder.with_column_indexes(&column_indexes);
+        encoder = encoder.with_offset_indexes(&offset_indexes);
+        if let Some(key_value_metadata) = key_value_metadata {
+            encoder = encoder.with_key_value_metadata(key_value_metadata);
+        }
+        encoder.finish()?;
+
+        Ok(())
+    }
+
+    fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndex>>> {
+        if let Some(row_group_column_indexes) = self.metadata.column_index() {
+            (0..self.metadata.row_groups().len())
+                .map(|rg_idx| {
+                    let column_indexes = &row_group_column_indexes[rg_idx];
+                    column_indexes
+                        .iter()
+                        .map(|column_index| match column_index {
+                            Index::NONE => None,
+                            Index::BOOLEAN(column_index) => 
Some(column_index.to_thrift()),
+                            Index::BYTE_ARRAY(column_index) => 
Some(column_index.to_thrift()),
+                            Index::DOUBLE(column_index) => 
Some(column_index.to_thrift()),
+                            Index::FIXED_LEN_BYTE_ARRAY(column_index) => {
+                                Some(column_index.to_thrift())
+                            }
+                            Index::FLOAT(column_index) => 
Some(column_index.to_thrift()),
+                            Index::INT32(column_index) => 
Some(column_index.to_thrift()),
+                            Index::INT64(column_index) => 
Some(column_index.to_thrift()),
+                            Index::INT96(column_index) => 
Some(column_index.to_thrift()),
+                        })
+                        .collect()
+                })
+                .collect()
+        } else {
+            // make a None for each row group, for each column
+            self.metadata
+                .row_groups()
+                .iter()
+                .map(|rg| 
std::iter::repeat(None).take(rg.columns().len()).collect())
+                .collect()
+        }
+    }
+
+    fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndex>>> {
+        if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
+            (0..self.metadata.row_groups().len())
+                .map(|rg_idx| {
+                    let offset_indexes = &row_group_offset_indexes[rg_idx];
+                    offset_indexes
+                        .iter()
+                        .map(|offset_index| Some(offset_index.to_thrift()))
+                        .collect()
+                })
+                .collect()
+        } else {
+            // make a None for each row group, for each column
+            self.metadata
+                .row_groups()
+                .iter()
+                .map(|rg| 
std::iter::repeat(None).take(rg.columns().len()).collect())
+                .collect()
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -2070,6 +2265,239 @@ mod tests {
         assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
     }
 
+    #[cfg(feature = "async")]
+    mod async_tests {
+        use std::sync::Arc;
+
+        use crate::file::footer::parse_metadata;
+        use crate::file::properties::{EnabledStatistics, WriterProperties};
+        use crate::file::reader::{FileReader, SerializedFileReader};
+        use crate::file::writer::ParquetMetadataWriter;
+        use crate::{
+            arrow::ArrowWriter,
+            file::{page_index::index::Index, 
serialized_reader::ReadOptionsBuilder},
+        };
+        use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+        use arrow_schema::{DataType as ArrowDataType, Field, Schema};
+        use bytes::{BufMut, Bytes, BytesMut};
+
+        use super::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData};
+
+        struct TestMetadata {
+            #[allow(dead_code)]
+            file_size: usize,
+            metadata: ParquetMetaData,
+        }
+
+        fn has_page_index(metadata: &ParquetMetaData) -> bool {
+            match metadata.column_index() {
+                Some(column_index) => column_index
+                    .iter()
+                    .any(|rg_idx| rg_idx.iter().all(|col_idx| 
!matches!(col_idx, Index::NONE))),
+                None => false,
+            }
+        }
+
+        #[test]
+        fn test_roundtrip_parquet_metadata_without_page_index() {
+            // We currently don't have an ad-hoc ParquetMetadata loader that 
can load page indexes so
+            // we at least test round trip without them
+            let metadata = get_test_metadata(false, false);
+            assert!(!has_page_index(&metadata.metadata));
+
+            let mut buf = BytesMut::new().writer();
+            {
+                let mut writer = ParquetMetadataWriter::new(&mut buf, 
&metadata.metadata);
+                writer.finish().unwrap();
+            }
+
+            let data = buf.into_inner().freeze();
+
+            let decoded_metadata = parse_metadata(&data).unwrap();
+            assert!(!has_page_index(&metadata.metadata));
+
+            assert_eq!(metadata.metadata, decoded_metadata);
+        }
+
+        fn get_test_metadata(write_page_index: bool, read_page_index: bool) -> 
TestMetadata {
+            let mut buf = BytesMut::new().writer();
+            let schema: Arc<Schema> = Arc::new(Schema::new(vec![Field::new(
+                "a",
+                ArrowDataType::Int32,
+                true,
+            )]));
+
+            let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, 
Some(2)]));
+
+            let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
+
+            let writer_props = match write_page_index {
+                true => WriterProperties::builder()
+                    .set_statistics_enabled(EnabledStatistics::Page)
+                    .build(),
+                false => WriterProperties::builder()
+                    .set_statistics_enabled(EnabledStatistics::Chunk)
+                    .build(),
+            };
+
+            let mut writer = ArrowWriter::try_new(&mut buf, schema, 
Some(writer_props)).unwrap();
+            writer.write(&batch).unwrap();
+            writer.close().unwrap();
+
+            let data = buf.into_inner().freeze();
+
+            let reader_opts = match read_page_index {
+                true => ReadOptionsBuilder::new().with_page_index().build(),
+                false => ReadOptionsBuilder::new().build(),
+            };
+            let reader = SerializedFileReader::new_with_options(data.clone(), 
reader_opts).unwrap();
+            let metadata = reader.metadata().clone();
+            TestMetadata {
+                file_size: data.len(),
+                metadata,
+            }
+        }
+
+        /// Temporary function so we can test loading metadata with page 
indexes
+        /// while we haven't fully figured out how to load it cleanly
+        async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> 
ParquetMetaData {
+            use crate::arrow::async_reader::{MetadataFetch, MetadataLoader};
+            use crate::errors::Result as ParquetResult;
+            use bytes::Bytes;
+            use futures::future::BoxFuture;
+            use futures::FutureExt;
+            use std::ops::Range;
+
+            /// Adapt a `Bytes` to a `MetadataFetch` implementation.
+            struct AsyncBytes {
+                data: Bytes,
+            }
+
+            impl AsyncBytes {
+                fn new(data: Bytes) -> Self {
+                    Self { data }
+                }
+            }
+
+            impl MetadataFetch for AsyncBytes {
+                fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, 
ParquetResult<Bytes>> {
+                    async move { Ok(self.data.slice(range.start..range.end)) 
}.boxed()
+                }
+            }
+
+            /// A `MetadataFetch` implementation that reads from a subset of 
the full data
+            /// while accepting ranges that address the full data.
+            struct MaskedBytes {
+                inner: Box<dyn MetadataFetch + Send>,
+                inner_range: Range<usize>,
+            }
+
+            impl MaskedBytes {
+                fn new(inner: Box<dyn MetadataFetch + Send>, inner_range: 
Range<usize>) -> Self {
+                    Self { inner, inner_range }
+                }
+            }
+
+            impl MetadataFetch for &mut MaskedBytes {
+                fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, 
ParquetResult<Bytes>> {
+                    let inner_range = self.inner_range.clone();
+                    println!("inner_range: {:?}", inner_range);
+                    println!("range: {:?}", range);
+                    assert!(inner_range.start <= range.start && 
inner_range.end >= range.end);
+                    let range =
+                        range.start - self.inner_range.start..range.end - 
self.inner_range.start;
+                    self.inner.fetch(range)
+                }
+            }
+
+            let metadata_length = data.len();
+            let mut reader = MaskedBytes::new(
+                Box::new(AsyncBytes::new(data)),
+                file_size - metadata_length..file_size,
+            );
+            let metadata = MetadataLoader::load(&mut reader, file_size, None)
+                .await
+                .unwrap();
+            let loaded_metadata = metadata.finish();
+            let mut metadata = MetadataLoader::new(&mut reader, 
loaded_metadata);
+            metadata.load_page_index(true, true).await.unwrap();
+            metadata.finish()
+        }
+
+        fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: 
&ColumnChunkMetaData) {
+            assert_eq!(left.column_descr(), right.column_descr());
+            assert_eq!(left.encodings(), right.encodings());
+            assert_eq!(left.num_values(), right.num_values());
+            assert_eq!(left.compressed_size(), right.compressed_size());
+            assert_eq!(left.data_page_offset(), right.data_page_offset());
+            assert_eq!(left.statistics(), right.statistics());
+            assert_eq!(left.offset_index_length(), 
right.offset_index_length());
+            assert_eq!(left.column_index_length(), 
right.column_index_length());
+            assert_eq!(
+                left.unencoded_byte_array_data_bytes(),
+                right.unencoded_byte_array_data_bytes()
+            );
+        }
+
+        fn check_row_groups_are_equivalent(left: &RowGroupMetaData, right: 
&RowGroupMetaData) {
+            assert_eq!(left.num_rows(), right.num_rows());
+            assert_eq!(left.file_offset(), right.file_offset());
+            assert_eq!(left.total_byte_size(), right.total_byte_size());
+            assert_eq!(left.schema_descr(), right.schema_descr());
+            assert_eq!(left.num_columns(), right.num_columns());
+            left.columns()
+                .iter()
+                .zip(right.columns().iter())
+                .for_each(|(lc, rc)| {
+                    check_columns_are_equivalent(lc, rc);
+                });
+        }
+
+        #[tokio::test]
+        async fn test_encode_parquet_metadata_with_page_index() {
+            // Create a ParquetMetadata with page index information
+            let metadata = get_test_metadata(true, true);
+            assert!(has_page_index(&metadata.metadata));
+
+            let mut buf = BytesMut::new().writer();
+            {
+                let mut writer = ParquetMetadataWriter::new(&mut buf, 
&metadata.metadata);
+                writer.finish().unwrap();
+            }
+
+            let data = buf.into_inner().freeze();
+
+            let decoded_metadata = load_metadata_from_bytes(data.len(), 
data).await;
+
+            // Because the page index offsets will differ, compare invariant 
parts of the metadata
+            assert_eq!(
+                metadata.metadata.file_metadata(),
+                decoded_metadata.file_metadata()
+            );
+            assert_eq!(
+                metadata.metadata.column_index(),
+                decoded_metadata.column_index()
+            );
+            assert_eq!(
+                metadata.metadata.offset_index(),
+                decoded_metadata.offset_index()
+            );
+            assert_eq!(
+                metadata.metadata.num_row_groups(),
+                decoded_metadata.num_row_groups()
+            );
+
+            metadata
+                .metadata
+                .row_groups()
+                .iter()
+                .zip(decoded_metadata.row_groups().iter())
+                .for_each(|(left, right)| {
+                    check_row_groups_are_equivalent(left, right);
+                });
+        }
+    }
+
     #[test]
     #[cfg(feature = "arrow")]
     fn test_byte_stream_split_extended_roundtrip() {
diff --git a/parquet/src/thrift.rs b/parquet/src/thrift.rs
index ad6c3f688..abb2ac13c 100644
--- a/parquet/src/thrift.rs
+++ b/parquet/src/thrift.rs
@@ -17,6 +17,7 @@
 
 //! Custom thrift definitions
 
+pub use thrift::protocol::TCompactOutputProtocol;
 use thrift::protocol::{
     TFieldIdentifier, TInputProtocol, TListIdentifier, TMapIdentifier, 
TMessageIdentifier,
     TOutputProtocol, TSetIdentifier, TStructIdentifier, TType,


Reply via email to