This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 371ec57e3 Expose `SortingColumn` in parquet files (#3103)
371ec57e3 is described below
commit 371ec57e370e76e8b690d404d9ca7a86c07a73e4
Author: askoa <[email protected]>
AuthorDate: Tue Nov 15 14:34:37 2022 -0500
Expose `SortingColumn` in parquet files (#3103)
* Expose SortColumn from parquet file
* fix formatting issues
* empty commit
* fix PR comments
* formatting fix
* add parquet round trip test
* fix clippy error
* update the test based on PR comment
Co-authored-by: askoa <askoa@local>
---
parquet/src/file/metadata.rs | 21 +++++++++++++--
parquet/src/file/properties.rs | 16 +++++++++++
parquet/src/file/writer.rs | 61 ++++++++++++++++++++++++++++++++++++++++++
3 files changed, 96 insertions(+), 2 deletions(-)
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 0804890c2..895776a8a 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -37,7 +37,7 @@ use std::sync::Arc;
use crate::format::{
BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex,
PageLocation,
- RowGroup,
+ RowGroup, SortingColumn,
};
use crate::basic::{ColumnOrder, Compression, Encoding, Type};
@@ -229,6 +229,7 @@ pub type RowGroupMetaDataPtr = Arc<RowGroupMetaData>;
pub struct RowGroupMetaData {
columns: Vec<ColumnChunkMetaData>,
num_rows: i64,
+ sorting_columns: Option<Vec<SortingColumn>>,
total_byte_size: i64,
schema_descr: SchemaDescPtr,
page_offset_index: Option<Vec<Vec<PageLocation>>>,
@@ -260,6 +261,11 @@ impl RowGroupMetaData {
self.num_rows
}
+ /// Returns the sort ordering of the rows in this RowGroup if any
+ pub fn sorting_columns(&self) -> Option<&Vec<SortingColumn>> {
+ self.sorting_columns.as_ref()
+ }
+
/// Total byte size of all uncompressed column data in this row group.
pub fn total_byte_size(&self) -> i64 {
self.total_byte_size
@@ -303,9 +309,11 @@ impl RowGroupMetaData {
let cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
columns.push(cc);
}
+ let sorting_columns = rg.sorting_columns;
Ok(RowGroupMetaData {
columns,
num_rows,
+ sorting_columns,
total_byte_size,
schema_descr,
page_offset_index: None,
@@ -318,7 +326,7 @@ impl RowGroupMetaData {
columns: self.columns().iter().map(|v| v.to_thrift()).collect(),
total_byte_size: self.total_byte_size,
num_rows: self.num_rows,
- sorting_columns: None,
+ sorting_columns: self.sorting_columns().cloned(),
file_offset: None,
total_compressed_size: None,
ordinal: None,
@@ -331,6 +339,7 @@ pub struct RowGroupMetaDataBuilder {
columns: Vec<ColumnChunkMetaData>,
schema_descr: SchemaDescPtr,
num_rows: i64,
+ sorting_columns: Option<Vec<SortingColumn>>,
total_byte_size: i64,
page_offset_index: Option<Vec<Vec<PageLocation>>>,
}
@@ -342,6 +351,7 @@ impl RowGroupMetaDataBuilder {
columns: Vec::with_capacity(schema_descr.num_columns()),
schema_descr,
num_rows: 0,
+ sorting_columns: None,
total_byte_size: 0,
page_offset_index: None,
}
@@ -353,6 +363,12 @@ impl RowGroupMetaDataBuilder {
self
}
+ /// Sets the sorting order for columns
+ pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) ->
Self {
+ self.sorting_columns = value;
+ self
+ }
+
/// Sets total size in bytes for this row group.
pub fn set_total_byte_size(mut self, value: i64) -> Self {
self.total_byte_size = value;
@@ -384,6 +400,7 @@ impl RowGroupMetaDataBuilder {
Ok(RowGroupMetaData {
columns: self.columns,
num_rows: self.num_rows,
+ sorting_columns: self.sorting_columns,
total_byte_size: self.total_byte_size,
schema_descr: self.schema_descr,
page_offset_index: self.page_offset_index,
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index cf821df21..c65ba8035 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -69,6 +69,7 @@ use std::{collections::HashMap, sync::Arc};
use crate::basic::{Compression, Encoding};
use crate::compression::{CodecOptions, CodecOptionsBuilder};
use crate::file::metadata::KeyValue;
+use crate::format::SortingColumn;
use crate::schema::types::ColumnPath;
const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
@@ -121,6 +122,7 @@ pub struct WriterProperties {
pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
+ sorting_columns: Option<Vec<SortingColumn>>,
}
impl WriterProperties {
@@ -182,6 +184,11 @@ impl WriterProperties {
self.key_value_metadata.as_ref()
}
+ /// Returns sorting columns.
+ pub fn sorting_columns(&self) -> Option<&Vec<SortingColumn>> {
+ self.sorting_columns.as_ref()
+ }
+
/// Returns encoding for a data page, when dictionary encoding is enabled.
/// This is not configurable.
#[inline]
@@ -262,6 +269,7 @@ pub struct WriterPropertiesBuilder {
key_value_metadata: Option<Vec<KeyValue>>,
default_column_properties: ColumnProperties,
column_properties: HashMap<ColumnPath, ColumnProperties>,
+ sorting_columns: Option<Vec<SortingColumn>>,
}
impl WriterPropertiesBuilder {
@@ -278,6 +286,7 @@ impl WriterPropertiesBuilder {
key_value_metadata: None,
default_column_properties: ColumnProperties::new(),
column_properties: HashMap::new(),
+ sorting_columns: None,
}
}
@@ -294,6 +303,7 @@ impl WriterPropertiesBuilder {
key_value_metadata: self.key_value_metadata,
default_column_properties: self.default_column_properties,
column_properties: self.column_properties,
+ sorting_columns: self.sorting_columns,
}
}
@@ -370,6 +380,12 @@ impl WriterPropertiesBuilder {
self
}
+ /// Sets sorting order of rows in the row group if any
+ pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) ->
Self {
+ self.sorting_columns = value;
+ self
+ }
+
// ----------------------------------------------------------------------
// Setters for any column (global)
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 2efaf7caf..b67bdccfe 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -434,6 +434,7 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
.set_column_metadata(column_chunks)
.set_total_byte_size(self.total_bytes_written as i64)
.set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
+ .set_sorting_columns(self.props.sorting_columns().cloned())
.build()?;
let metadata = Arc::new(row_group_metadata);
@@ -653,6 +654,7 @@ mod tests {
reader::{FileReader, SerializedFileReader, SerializedPageReader},
statistics::{from_thrift, to_thrift, Statistics},
};
+ use crate::format::SortingColumn;
use crate::record::RowAccessor;
use crate::schema::types::{ColumnDescriptor, ColumnPath};
use crate::util::memory::ByteBufferPtr;
@@ -844,6 +846,65 @@ mod tests {
assert_eq!(read_field, &field);
}
+ #[test]
+ fn test_file_writer_with_sorting_columns_metadata() {
+ let file = tempfile::tempfile().unwrap();
+
+ let schema = Arc::new(
+ types::Type::group_type_builder("schema")
+ .with_fields(&mut vec![
+ Arc::new(
+ types::Type::primitive_type_builder("col1",
Type::INT32)
+ .build()
+ .unwrap(),
+ ),
+ Arc::new(
+ types::Type::primitive_type_builder("col2",
Type::INT32)
+ .build()
+ .unwrap(),
+ ),
+ ])
+ .build()
+ .unwrap(),
+ );
+ let expected_result = Some(vec![SortingColumn {
+ column_idx: 0,
+ descending: false,
+ nulls_first: true,
+ }]);
+ let props = Arc::new(
+ WriterProperties::builder()
+ .set_key_value_metadata(Some(vec![KeyValue::new(
+ "key".to_string(),
+ "value".to_string(),
+ )]))
+ .set_sorting_columns(expected_result.clone())
+ .build(),
+ );
+ let mut writer =
+ SerializedFileWriter::new(file.try_clone().unwrap(), schema,
props).unwrap();
+ let mut row_group_writer = writer.next_row_group().expect("get row
group writer");
+
+ let col_writer = row_group_writer.next_column().unwrap().unwrap();
+ col_writer.close().unwrap();
+
+ let col_writer = row_group_writer.next_column().unwrap().unwrap();
+ col_writer.close().unwrap();
+
+ row_group_writer.close().unwrap();
+ writer.close().unwrap();
+
+ let reader = SerializedFileReader::new(file).unwrap();
+ let result: Vec<Option<&Vec<SortingColumn>>> = reader
+ .metadata()
+ .row_groups()
+ .iter()
+ .map(|f| f.sorting_columns())
+ .collect();
+ // validate the sorting column read match the one written above
+ assert_eq!(expected_result.as_ref(), result[0]);
+ }
+
#[test]
fn test_file_writer_empty_row_groups() {
let file = tempfile::tempfile().unwrap();