This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new d5ed6b9f8 Add `ThriftMetadataWriter` for writing Parquet metadata
(#6197)
d5ed6b9f8 is described below
commit d5ed6b9f82482046b7744ea6d01e0f06d8a1c4f9
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Tue Aug 6 15:42:24 2024 -0500
Add `ThriftMetadataWriter` for writing Parquet metadata (#6197)
* bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` (#6041)
* bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight`
Signed-off-by: Bugen Zhao <[email protected]>
* fix example tests
Signed-off-by: Bugen Zhao <[email protected]>
---------
Signed-off-by: Bugen Zhao <[email protected]>
* Remove `impl<T: AsRef<[u8]>> From<T> for Buffer` that easily
accidentally copies data (#6043)
* deprecate auto copy, ask explicit reference
* update comments
* make cargo doc happy
* Make display of interval types more pretty (#6006)
* improve dispaly for interval.
* update test in pretty, and fix display problem.
* tmp
* fix tests in arrow-cast.
* fix tests in pretty.
* fix style.
* Update snafu (#5930)
* Update Parquet thrift generated structures (#6045)
* update to latest thrift (as of 11 Jul 2024) from parquet-format
* pass None for optional size statistics
* escape HTML tags
* don't need to escape brackets in arrays
* Revert "Revert "Write Bloom filters between row groups instead of the end
(#…" (#5933)
This reverts commit 22e0b4432c9838f2536284015271d3de9a165135.
* Revert "Update snafu (#5930)" (#6069)
This reverts commit 756b1fb26d1702f36f446faf9bb40a4869c3e840.
* Update pyo3 requirement from 0.21.1 to 0.22.1 (fixed) (#6075)
* Update pyo3 requirement from 0.21.1 to 0.22.1
Updates the requirements on [pyo3](https://github.com/pyo3/pyo3) to permit
the latest version.
- [Release notes](https://github.com/pyo3/pyo3/releases)
- [Changelog](https://github.com/PyO3/pyo3/blob/main/CHANGELOG.md)
- [Commits](https://github.com/pyo3/pyo3/compare/v0.21.1...v0.22.1)
---
updated-dependencies:
- dependency-name: pyo3
dependency-type: direct:production
...
Signed-off-by: dependabot[bot] <[email protected]>
* refactor: remove deprecated `FromPyArrow::from_pyarrow`
"GIL Refs" are being phased out.
* chore: update `pyo3` in integration tests
---------
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* remove repeated codes to make the codes more concise. (#6080)
* Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` (#6068)
* update to latest thrift (as of 11 Jul 2024) from parquet-format
* pass None for optional size statistics
* escape HTML tags
* don't need to escape brackets in arrays
* add support for unencoded_byte_array_data_bytes
* add comments
* change sig of ColumnMetrics::update_variable_length_bytes()
* rename ParquetOffsetIndex to OffsetSizeIndex
* rename some functions
* suggestion from review
Co-authored-by: Andrew Lamb <[email protected]>
* add Default trait to ColumnMetrics as suggested in review
* rename OffsetSizeIndex to OffsetIndexMetaData
---------
Co-authored-by: Andrew Lamb <[email protected]>
* Update pyo3 requirement from 0.21.1 to 0.22.2 (#6085)
Updates the requirements on [pyo3](https://github.com/pyo3/pyo3) to permit
the latest version.
- [Release notes](https://github.com/pyo3/pyo3/releases)
- [Changelog](https://github.com/PyO3/pyo3/blob/v0.22.2/CHANGELOG.md)
- [Commits](https://github.com/pyo3/pyo3/compare/v0.21.1...v0.22.2)
---
updated-dependencies:
- dependency-name: pyo3
dependency-type: direct:production
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* Deprecate read_page_locations() and simplify offset index in
`ParquetMetaData` (#6095)
* deprecate read_page_locations
* add to_thrift() to OffsetIndexMetaData
* Update parquet/src/column/writer/mod.rs
Co-authored-by: Ed Seidl <[email protected]>
* Upgrade protobuf definitions to flightsql 17.0 (#6133)
* Update FlightSql.proto to version 17.0
Adds new message CommandStatementIngest and removes `experimental` from
other messages.
* Regenerate flight sql protocol
This upgrades the file to version 17.0 of the protobuf definition.
* Add `ParquetMetadataWriter` allow ad-hoc encoding of `ParquetMetadata`
* fix loading in test by etseidl
Co-authored-by: Ed Seidl <[email protected]>
* add rough equivalence test
* one more check
* make clippy happy
* separate tests that require arrow into a separate module
* add histograms to to_thrift()
---------
Signed-off-by: Bugen Zhao <[email protected]>
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: Bugen Zhao <[email protected]>
Co-authored-by: Xiangpeng Hao <[email protected]>
Co-authored-by: kamille <[email protected]>
Co-authored-by: Jesse <[email protected]>
Co-authored-by: Ed Seidl <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Marco Neumann <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Douglas Anderson <[email protected]>
Co-authored-by: Ed Seidl <[email protected]>
---
parquet/src/file/page_index/index.rs | 47 +++
parquet/src/file/writer.rs | 624 +++++++++++++++++++++++++++++------
parquet/src/thrift.rs | 1 +
3 files changed, 574 insertions(+), 98 deletions(-)
diff --git a/parquet/src/file/page_index/index.rs
b/parquet/src/file/page_index/index.rs
index cebb602b3..0c23e4aa3 100644
--- a/parquet/src/file/page_index/index.rs
+++ b/parquet/src/file/page_index/index.rs
@@ -225,6 +225,53 @@ impl<T: ParquetValueType> NativeIndex<T> {
boundary_order: index.boundary_order,
})
}
+
+ pub(crate) fn to_thrift(&self) -> ColumnIndex {
+ let min_values = self
+ .indexes
+ .iter()
+ .map(|x| x.min_bytes().map(|x| x.to_vec()))
+ .collect::<Option<Vec<_>>>()
+ .unwrap_or_else(|| vec![vec![]; self.indexes.len()]);
+
+ let max_values = self
+ .indexes
+ .iter()
+ .map(|x| x.max_bytes().map(|x| x.to_vec()))
+ .collect::<Option<Vec<_>>>()
+ .unwrap_or_else(|| vec![vec![]; self.indexes.len()]);
+
+ let null_counts = self
+ .indexes
+ .iter()
+ .map(|x| x.null_count())
+ .collect::<Option<Vec<_>>>();
+
+ // Concatenate page histograms into a single Option<Vec>
+ let repetition_level_histograms = self
+ .indexes
+ .iter()
+ .map(|x| x.repetition_level_histogram().map(|v| v.values()))
+ .collect::<Option<Vec<&[i64]>>>()
+ .map(|hists| hists.concat());
+
+ let definition_level_histograms = self
+ .indexes
+ .iter()
+ .map(|x| x.definition_level_histogram().map(|v| v.values()))
+ .collect::<Option<Vec<&[i64]>>>()
+ .map(|hists| hists.concat());
+
+ ColumnIndex::new(
+ self.indexes.iter().map(|x| x.min().is_none()).collect(),
+ min_values,
+ max_values,
+ self.boundary_order,
+ null_counts,
+ repetition_level_histograms,
+ definition_level_histograms,
+ )
+ }
}
#[cfg(test)]
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index cf383103d..c84d06a2c 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -19,6 +19,7 @@
//! using row group writers and column writers respectively.
use crate::bloom_filter::Sbbf;
+use crate::file::page_index::index::Index;
use crate::format as parquet;
use crate::format::{ColumnIndex, OffsetIndex, RowGroup};
use crate::thrift::TSerializable;
@@ -261,122 +262,41 @@ impl<W: Write + Send> SerializedFileWriter<W> {
Ok(())
}
- /// Serialize all the offset index to the file
- fn write_offset_indexes(&mut self, row_groups: &mut [RowGroup]) ->
Result<()> {
- // iter row group
- // iter each column
- // write offset index to the file
- for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
- for (column_idx, column_metadata) in
row_group.columns.iter_mut().enumerate() {
- match &self.offset_indexes[row_group_idx][column_idx] {
- Some(offset_index) => {
- let start_offset = self.buf.bytes_written();
- let mut protocol = TCompactOutputProtocol::new(&mut
self.buf);
- offset_index.write_to_out_protocol(&mut protocol)?;
- let end_offset = self.buf.bytes_written();
- // set offset and index for offset index
- column_metadata.offset_index_offset =
Some(start_offset as i64);
- column_metadata.offset_index_length =
- Some((end_offset - start_offset) as i32);
- }
- None => {}
- }
- }
- }
- Ok(())
- }
-
- /// Serialize all the column index to the file
- fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) ->
Result<()> {
- // iter row group
- // iter each column
- // write column index to the file
- for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() {
- for (column_idx, column_metadata) in
row_group.columns.iter_mut().enumerate() {
- match &self.column_indexes[row_group_idx][column_idx] {
- Some(column_index) => {
- let start_offset = self.buf.bytes_written();
- let mut protocol = TCompactOutputProtocol::new(&mut
self.buf);
- column_index.write_to_out_protocol(&mut protocol)?;
- let end_offset = self.buf.bytes_written();
- // set offset and index for offset index
- column_metadata.column_index_offset =
Some(start_offset as i64);
- column_metadata.column_index_length =
- Some((end_offset - start_offset) as i32);
- }
- None => {}
- }
- }
- }
- Ok(())
- }
-
/// Assembles and writes metadata at the end of the file.
fn write_metadata(&mut self) -> Result<parquet::FileMetaData> {
self.finished = true;
- let num_rows = self.row_groups.iter().map(|x| x.num_rows()).sum();
// write out any remaining bloom filters after all row groups
for row_group in &mut self.row_groups {
write_bloom_filters(&mut self.buf, &mut self.bloom_filters,
row_group)?;
}
- let mut row_groups = self
- .row_groups
- .as_slice()
- .iter()
- .map(|v| v.to_thrift())
- .collect::<Vec<_>>();
-
- // Write column indexes and offset indexes
- self.write_column_indexes(&mut row_groups)?;
- self.write_offset_indexes(&mut row_groups)?;
-
let key_value_metadata = match self.props.key_value_metadata() {
Some(kv) =>
Some(kv.iter().chain(&self.kv_metadatas).cloned().collect()),
None if self.kv_metadatas.is_empty() => None,
None => Some(self.kv_metadatas.clone()),
};
- // We only include ColumnOrder for leaf nodes.
- // Currently only supported ColumnOrder is TypeDefinedOrder so we set
this
- // for all leaf nodes.
- // Even if the column has an undefined sort order, such as INTERVAL,
this
- // is still technically the defined TYPEORDER so it should still be
set.
- let column_orders = (0..self.schema_descr().num_columns())
- .map(|_| parquet::ColumnOrder::TYPEORDER(parquet::TypeDefinedOrder
{}))
- .collect();
- // This field is optional, perhaps in cases where no min/max fields
are set
- // in any Statistics or ColumnIndex object in the whole file.
- // But for simplicity we always set this field.
- let column_orders = Some(column_orders);
+ let row_groups = self
+ .row_groups
+ .iter()
+ .map(|v| v.to_thrift())
+ .collect::<Vec<_>>();
- let file_metadata = parquet::FileMetaData {
- num_rows,
+ let mut encoder = ThriftMetadataWriter::new(
+ &mut self.buf,
+ &self.schema,
+ &self.descr,
row_groups,
- key_value_metadata,
- version: self.props.writer_version().as_num(),
- schema: types::to_thrift(self.schema.as_ref())?,
- created_by: Some(self.props.created_by().to_owned()),
- column_orders,
- encryption_algorithm: None,
- footer_signing_key_metadata: None,
- };
-
- // Write file metadata
- let start_pos = self.buf.bytes_written();
- {
- let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
- file_metadata.write_to_out_protocol(&mut protocol)?;
+ Some(self.props.created_by().to_string()),
+ self.props.writer_version().as_num(),
+ );
+ if let Some(key_value_metadata) = key_value_metadata {
+ encoder = encoder.with_key_value_metadata(key_value_metadata)
}
- let end_pos = self.buf.bytes_written();
-
- // Write footer
- let metadata_len = (end_pos - start_pos) as u32;
-
- self.buf.write_all(&metadata_len.to_le_bytes())?;
- self.buf.write_all(&PARQUET_MAGIC)?;
- Ok(file_metadata)
+ encoder = encoder.with_column_indexes(&self.column_indexes);
+ encoder = encoder.with_offset_indexes(&self.offset_indexes);
+ encoder.finish()
}
#[inline]
@@ -810,6 +730,281 @@ impl<'a, W: Write + Send> PageWriter for
SerializedPageWriter<'a, W> {
}
}
+/// Writes `crate::file::metadata` structures to a thrift encdoded byte streams
+///
+/// This structure handles the details of writing the various parts of parquet
+/// metadata into a byte stream. It is used to write the metadata into a
+/// parquet file and can also write metadata into other locations (such as a
+/// store of bytes).
+///
+/// This is somewhat trickey because the metadata is not store as a single
inline
+/// thrift struture. It can have several "out of band" structures such as the
OffsetIndex
+/// and BloomFilters which are stored separately whose locations are stored as
offsets
+struct ThriftMetadataWriter<'a, W: Write> {
+ buf: &'a mut TrackedWrite<W>,
+ schema: &'a TypePtr,
+ schema_descr: &'a SchemaDescPtr,
+ row_groups: Vec<RowGroup>,
+ column_indexes: Option<&'a [Vec<Option<ColumnIndex>>]>,
+ offset_indexes: Option<&'a [Vec<Option<OffsetIndex>>]>,
+ key_value_metadata: Option<Vec<KeyValue>>,
+ created_by: Option<String>,
+ writer_version: i32,
+}
+
+impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
+ /// Serialize all the offset index to the file
+ fn write_offset_indexes(&mut self, offset_indexes:
&[Vec<Option<OffsetIndex>>]) -> Result<()> {
+ // iter row group
+ // iter each column
+ // write offset index to the file
+ for (row_group_idx, row_group) in
self.row_groups.iter_mut().enumerate() {
+ for (column_idx, column_metadata) in
row_group.columns.iter_mut().enumerate() {
+ match &offset_indexes[row_group_idx][column_idx] {
+ Some(offset_index) => {
+ let start_offset = self.buf.bytes_written();
+ let mut protocol = TCompactOutputProtocol::new(&mut
self.buf);
+ offset_index.write_to_out_protocol(&mut protocol)?;
+ let end_offset = self.buf.bytes_written();
+ // set offset and index for offset index
+ column_metadata.offset_index_offset =
Some(start_offset as i64);
+ column_metadata.offset_index_length =
+ Some((end_offset - start_offset) as i32);
+ }
+ None => {}
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Serialize all the column index to the file
+ fn write_column_indexes(&mut self, column_indexes:
&[Vec<Option<ColumnIndex>>]) -> Result<()> {
+ // iter row group
+ // iter each column
+ // write column index to the file
+ for (row_group_idx, row_group) in
self.row_groups.iter_mut().enumerate() {
+ for (column_idx, column_metadata) in
row_group.columns.iter_mut().enumerate() {
+ match &column_indexes[row_group_idx][column_idx] {
+ Some(column_index) => {
+ let start_offset = self.buf.bytes_written();
+ let mut protocol = TCompactOutputProtocol::new(&mut
self.buf);
+ column_index.write_to_out_protocol(&mut protocol)?;
+ let end_offset = self.buf.bytes_written();
+ // set offset and index for offset index
+ column_metadata.column_index_offset =
Some(start_offset as i64);
+ column_metadata.column_index_length =
+ Some((end_offset - start_offset) as i32);
+ }
+ None => {}
+ }
+ }
+ }
+ Ok(())
+ }
+
+ /// Assembles and writes the final metadata to self.buf
+ pub fn finish(mut self) -> Result<parquet::FileMetaData> {
+ let num_rows = self.row_groups.iter().map(|x| x.num_rows).sum();
+
+ // Write column indexes and offset indexes
+ if let Some(column_indexes) = self.column_indexes {
+ self.write_column_indexes(column_indexes)?;
+ }
+ if let Some(offset_indexes) = self.offset_indexes {
+ self.write_offset_indexes(offset_indexes)?;
+ }
+
+ // We only include ColumnOrder for leaf nodes.
+ // Currently only supported ColumnOrder is TypeDefinedOrder so we set
this
+ // for all leaf nodes.
+ // Even if the column has an undefined sort order, such as INTERVAL,
this
+ // is still technically the defined TYPEORDER so it should still be
set.
+ let column_orders = (0..self.schema_descr.num_columns())
+ .map(|_| parquet::ColumnOrder::TYPEORDER(parquet::TypeDefinedOrder
{}))
+ .collect();
+ // This field is optional, perhaps in cases where no min/max fields
are set
+ // in any Statistics or ColumnIndex object in the whole file.
+ // But for simplicity we always set this field.
+ let column_orders = Some(column_orders);
+
+ let file_metadata = parquet::FileMetaData {
+ num_rows,
+ row_groups: self.row_groups,
+ key_value_metadata: self.key_value_metadata.clone(),
+ version: self.writer_version,
+ schema: types::to_thrift(self.schema.as_ref())?,
+ created_by: self.created_by.clone(),
+ column_orders,
+ encryption_algorithm: None,
+ footer_signing_key_metadata: None,
+ };
+
+ // Write file metadata
+ let start_pos = self.buf.bytes_written();
+ {
+ let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
+ file_metadata.write_to_out_protocol(&mut protocol)?;
+ }
+ let end_pos = self.buf.bytes_written();
+
+ // Write footer
+ let metadata_len = (end_pos - start_pos) as u32;
+
+ self.buf.write_all(&metadata_len.to_le_bytes())?;
+ self.buf.write_all(&PARQUET_MAGIC)?;
+ Ok(file_metadata)
+ }
+
+ pub(self) fn new(
+ buf: &'a mut TrackedWrite<W>,
+ schema: &'a TypePtr,
+ schema_descr: &'a SchemaDescPtr,
+ row_groups: Vec<RowGroup>,
+ created_by: Option<String>,
+ writer_version: i32,
+ ) -> Self {
+ Self {
+ buf,
+ schema,
+ schema_descr,
+ row_groups,
+ column_indexes: None,
+ offset_indexes: None,
+ key_value_metadata: None,
+ created_by,
+ writer_version,
+ }
+ }
+
+ pub fn with_column_indexes(mut self, column_indexes: &'a
[Vec<Option<ColumnIndex>>]) -> Self {
+ self.column_indexes = Some(column_indexes);
+ self
+ }
+
+ pub fn with_offset_indexes(mut self, offset_indexes: &'a
[Vec<Option<OffsetIndex>>]) -> Self {
+ self.offset_indexes = Some(offset_indexes);
+ self
+ }
+
+ pub fn with_key_value_metadata(mut self, key_value_metadata:
Vec<KeyValue>) -> Self {
+ self.key_value_metadata = Some(key_value_metadata);
+ self
+ }
+}
+
+pub struct ParquetMetadataWriter<'a, W: Write> {
+ buf: TrackedWrite<W>,
+ write_page_index: bool,
+ metadata: &'a ParquetMetaData,
+}
+
+impl<'a, W: Write> ParquetMetadataWriter<'a, W> {
+ pub fn new(buf: W, metadata: &'a ParquetMetaData) -> Self {
+ Self {
+ buf: TrackedWrite::new(buf),
+ write_page_index: true,
+ metadata,
+ }
+ }
+
+ pub fn write_page_index(&mut self, write_page_index: bool) -> &mut Self {
+ self.write_page_index = write_page_index;
+ self
+ }
+
+ pub fn finish(&mut self) -> Result<()> {
+ let file_metadata = self.metadata.file_metadata();
+
+ let schema = Arc::new(file_metadata.schema().clone());
+ let schema_descr = Arc::new(SchemaDescriptor::new(schema.clone()));
+ let created_by = file_metadata.created_by().map(str::to_string);
+
+ let row_groups = self
+ .metadata
+ .row_groups()
+ .iter()
+ .map(|rg| rg.to_thrift())
+ .collect::<Vec<_>>();
+
+ let key_value_metadata = file_metadata.key_value_metadata().cloned();
+
+ let column_indexes = self.convert_column_indexes();
+ let offset_indexes = self.convert_offset_index();
+
+ let mut encoder = ThriftMetadataWriter::new(
+ &mut self.buf,
+ &schema,
+ &schema_descr,
+ row_groups,
+ created_by,
+ file_metadata.version(),
+ );
+ encoder = encoder.with_column_indexes(&column_indexes);
+ encoder = encoder.with_offset_indexes(&offset_indexes);
+ if let Some(key_value_metadata) = key_value_metadata {
+ encoder = encoder.with_key_value_metadata(key_value_metadata);
+ }
+ encoder.finish()?;
+
+ Ok(())
+ }
+
+ fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndex>>> {
+ if let Some(row_group_column_indexes) = self.metadata.column_index() {
+ (0..self.metadata.row_groups().len())
+ .map(|rg_idx| {
+ let column_indexes = &row_group_column_indexes[rg_idx];
+ column_indexes
+ .iter()
+ .map(|column_index| match column_index {
+ Index::NONE => None,
+ Index::BOOLEAN(column_index) =>
Some(column_index.to_thrift()),
+ Index::BYTE_ARRAY(column_index) =>
Some(column_index.to_thrift()),
+ Index::DOUBLE(column_index) =>
Some(column_index.to_thrift()),
+ Index::FIXED_LEN_BYTE_ARRAY(column_index) => {
+ Some(column_index.to_thrift())
+ }
+ Index::FLOAT(column_index) =>
Some(column_index.to_thrift()),
+ Index::INT32(column_index) =>
Some(column_index.to_thrift()),
+ Index::INT64(column_index) =>
Some(column_index.to_thrift()),
+ Index::INT96(column_index) =>
Some(column_index.to_thrift()),
+ })
+ .collect()
+ })
+ .collect()
+ } else {
+ // make a None for each row group, for each column
+ self.metadata
+ .row_groups()
+ .iter()
+ .map(|rg|
std::iter::repeat(None).take(rg.columns().len()).collect())
+ .collect()
+ }
+ }
+
+ fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndex>>> {
+ if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
+ (0..self.metadata.row_groups().len())
+ .map(|rg_idx| {
+ let offset_indexes = &row_group_offset_indexes[rg_idx];
+ offset_indexes
+ .iter()
+ .map(|offset_index| Some(offset_index.to_thrift()))
+ .collect()
+ })
+ .collect()
+ } else {
+ // make a None for each row group, for each column
+ self.metadata
+ .row_groups()
+ .iter()
+ .map(|rg|
std::iter::repeat(None).take(rg.columns().len()).collect())
+ .collect()
+ }
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -2070,6 +2265,239 @@ mod tests {
assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
}
+ #[cfg(feature = "async")]
+ mod async_tests {
+ use std::sync::Arc;
+
+ use crate::file::footer::parse_metadata;
+ use crate::file::properties::{EnabledStatistics, WriterProperties};
+ use crate::file::reader::{FileReader, SerializedFileReader};
+ use crate::file::writer::ParquetMetadataWriter;
+ use crate::{
+ arrow::ArrowWriter,
+ file::{page_index::index::Index,
serialized_reader::ReadOptionsBuilder},
+ };
+ use arrow_array::{ArrayRef, Int32Array, RecordBatch};
+ use arrow_schema::{DataType as ArrowDataType, Field, Schema};
+ use bytes::{BufMut, Bytes, BytesMut};
+
+ use super::{ColumnChunkMetaData, ParquetMetaData, RowGroupMetaData};
+
+ struct TestMetadata {
+ #[allow(dead_code)]
+ file_size: usize,
+ metadata: ParquetMetaData,
+ }
+
+ fn has_page_index(metadata: &ParquetMetaData) -> bool {
+ match metadata.column_index() {
+ Some(column_index) => column_index
+ .iter()
+ .any(|rg_idx| rg_idx.iter().all(|col_idx|
!matches!(col_idx, Index::NONE))),
+ None => false,
+ }
+ }
+
+ #[test]
+ fn test_roundtrip_parquet_metadata_without_page_index() {
+ // We currently don't have an ad-hoc ParquetMetadata loader that
can load page indexes so
+ // we at least test round trip without them
+ let metadata = get_test_metadata(false, false);
+ assert!(!has_page_index(&metadata.metadata));
+
+ let mut buf = BytesMut::new().writer();
+ {
+ let mut writer = ParquetMetadataWriter::new(&mut buf,
&metadata.metadata);
+ writer.finish().unwrap();
+ }
+
+ let data = buf.into_inner().freeze();
+
+ let decoded_metadata = parse_metadata(&data).unwrap();
+ assert!(!has_page_index(&metadata.metadata));
+
+ assert_eq!(metadata.metadata, decoded_metadata);
+ }
+
+ fn get_test_metadata(write_page_index: bool, read_page_index: bool) ->
TestMetadata {
+ let mut buf = BytesMut::new().writer();
+ let schema: Arc<Schema> = Arc::new(Schema::new(vec![Field::new(
+ "a",
+ ArrowDataType::Int32,
+ true,
+ )]));
+
+ let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None,
Some(2)]));
+
+ let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
+
+ let writer_props = match write_page_index {
+ true => WriterProperties::builder()
+ .set_statistics_enabled(EnabledStatistics::Page)
+ .build(),
+ false => WriterProperties::builder()
+ .set_statistics_enabled(EnabledStatistics::Chunk)
+ .build(),
+ };
+
+ let mut writer = ArrowWriter::try_new(&mut buf, schema,
Some(writer_props)).unwrap();
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let data = buf.into_inner().freeze();
+
+ let reader_opts = match read_page_index {
+ true => ReadOptionsBuilder::new().with_page_index().build(),
+ false => ReadOptionsBuilder::new().build(),
+ };
+ let reader = SerializedFileReader::new_with_options(data.clone(),
reader_opts).unwrap();
+ let metadata = reader.metadata().clone();
+ TestMetadata {
+ file_size: data.len(),
+ metadata,
+ }
+ }
+
+ /// Temporary function so we can test loading metadata with page
indexes
+ /// while we haven't fully figured out how to load it cleanly
+ async fn load_metadata_from_bytes(file_size: usize, data: Bytes) ->
ParquetMetaData {
+ use crate::arrow::async_reader::{MetadataFetch, MetadataLoader};
+ use crate::errors::Result as ParquetResult;
+ use bytes::Bytes;
+ use futures::future::BoxFuture;
+ use futures::FutureExt;
+ use std::ops::Range;
+
+ /// Adapt a `Bytes` to a `MetadataFetch` implementation.
+ struct AsyncBytes {
+ data: Bytes,
+ }
+
+ impl AsyncBytes {
+ fn new(data: Bytes) -> Self {
+ Self { data }
+ }
+ }
+
+ impl MetadataFetch for AsyncBytes {
+ fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_,
ParquetResult<Bytes>> {
+ async move { Ok(self.data.slice(range.start..range.end))
}.boxed()
+ }
+ }
+
+ /// A `MetadataFetch` implementation that reads from a subset of
the full data
+ /// while accepting ranges that address the full data.
+ struct MaskedBytes {
+ inner: Box<dyn MetadataFetch + Send>,
+ inner_range: Range<usize>,
+ }
+
+ impl MaskedBytes {
+ fn new(inner: Box<dyn MetadataFetch + Send>, inner_range:
Range<usize>) -> Self {
+ Self { inner, inner_range }
+ }
+ }
+
+ impl MetadataFetch for &mut MaskedBytes {
+ fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_,
ParquetResult<Bytes>> {
+ let inner_range = self.inner_range.clone();
+ println!("inner_range: {:?}", inner_range);
+ println!("range: {:?}", range);
+ assert!(inner_range.start <= range.start &&
inner_range.end >= range.end);
+ let range =
+ range.start - self.inner_range.start..range.end -
self.inner_range.start;
+ self.inner.fetch(range)
+ }
+ }
+
+ let metadata_length = data.len();
+ let mut reader = MaskedBytes::new(
+ Box::new(AsyncBytes::new(data)),
+ file_size - metadata_length..file_size,
+ );
+ let metadata = MetadataLoader::load(&mut reader, file_size, None)
+ .await
+ .unwrap();
+ let loaded_metadata = metadata.finish();
+ let mut metadata = MetadataLoader::new(&mut reader,
loaded_metadata);
+ metadata.load_page_index(true, true).await.unwrap();
+ metadata.finish()
+ }
+
+ fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right:
&ColumnChunkMetaData) {
+ assert_eq!(left.column_descr(), right.column_descr());
+ assert_eq!(left.encodings(), right.encodings());
+ assert_eq!(left.num_values(), right.num_values());
+ assert_eq!(left.compressed_size(), right.compressed_size());
+ assert_eq!(left.data_page_offset(), right.data_page_offset());
+ assert_eq!(left.statistics(), right.statistics());
+ assert_eq!(left.offset_index_length(),
right.offset_index_length());
+ assert_eq!(left.column_index_length(),
right.column_index_length());
+ assert_eq!(
+ left.unencoded_byte_array_data_bytes(),
+ right.unencoded_byte_array_data_bytes()
+ );
+ }
+
+ fn check_row_groups_are_equivalent(left: &RowGroupMetaData, right:
&RowGroupMetaData) {
+ assert_eq!(left.num_rows(), right.num_rows());
+ assert_eq!(left.file_offset(), right.file_offset());
+ assert_eq!(left.total_byte_size(), right.total_byte_size());
+ assert_eq!(left.schema_descr(), right.schema_descr());
+ assert_eq!(left.num_columns(), right.num_columns());
+ left.columns()
+ .iter()
+ .zip(right.columns().iter())
+ .for_each(|(lc, rc)| {
+ check_columns_are_equivalent(lc, rc);
+ });
+ }
+
+ #[tokio::test]
+ async fn test_encode_parquet_metadata_with_page_index() {
+ // Create a ParquetMetadata with page index information
+ let metadata = get_test_metadata(true, true);
+ assert!(has_page_index(&metadata.metadata));
+
+ let mut buf = BytesMut::new().writer();
+ {
+ let mut writer = ParquetMetadataWriter::new(&mut buf,
&metadata.metadata);
+ writer.finish().unwrap();
+ }
+
+ let data = buf.into_inner().freeze();
+
+ let decoded_metadata = load_metadata_from_bytes(data.len(),
data).await;
+
+ // Because the page index offsets will differ, compare invariant
parts of the metadata
+ assert_eq!(
+ metadata.metadata.file_metadata(),
+ decoded_metadata.file_metadata()
+ );
+ assert_eq!(
+ metadata.metadata.column_index(),
+ decoded_metadata.column_index()
+ );
+ assert_eq!(
+ metadata.metadata.offset_index(),
+ decoded_metadata.offset_index()
+ );
+ assert_eq!(
+ metadata.metadata.num_row_groups(),
+ decoded_metadata.num_row_groups()
+ );
+
+ metadata
+ .metadata
+ .row_groups()
+ .iter()
+ .zip(decoded_metadata.row_groups().iter())
+ .for_each(|(left, right)| {
+ check_row_groups_are_equivalent(left, right);
+ });
+ }
+ }
+
#[test]
#[cfg(feature = "arrow")]
fn test_byte_stream_split_extended_roundtrip() {
diff --git a/parquet/src/thrift.rs b/parquet/src/thrift.rs
index ad6c3f688..abb2ac13c 100644
--- a/parquet/src/thrift.rs
+++ b/parquet/src/thrift.rs
@@ -17,6 +17,7 @@
//! Custom thrift definitions
+pub use thrift::protocol::TCompactOutputProtocol;
use thrift::protocol::{
TFieldIdentifier, TInputProtocol, TListIdentifier, TMapIdentifier,
TMessageIdentifier,
TOutputProtocol, TSetIdentifier, TStructIdentifier, TType,