This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 371ec57e3 Expose `SortingColumn` in parquet files (#3103)
371ec57e3 is described below

commit 371ec57e370e76e8b690d404d9ca7a86c07a73e4
Author: askoa <[email protected]>
AuthorDate: Tue Nov 15 14:34:37 2022 -0500

    Expose `SortingColumn` in parquet files (#3103)
    
    * Expose SortColumn from parquet file
    
    * fix formatting issues
    
    * empty commit
    
    * fix PR comments
    
    * formatting fix
    
    * add parquet round trip test
    
    * fix clippy error
    
    * update the test based on PR comment
    
    Co-authored-by: askoa <askoa@local>
---
 parquet/src/file/metadata.rs   | 21 +++++++++++++--
 parquet/src/file/properties.rs | 16 +++++++++++
 parquet/src/file/writer.rs     | 61 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 96 insertions(+), 2 deletions(-)

diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 0804890c2..895776a8a 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -37,7 +37,7 @@ use std::sync::Arc;
 
 use crate::format::{
     BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, 
PageLocation,
-    RowGroup,
+    RowGroup, SortingColumn,
 };
 
 use crate::basic::{ColumnOrder, Compression, Encoding, Type};
@@ -229,6 +229,7 @@ pub type RowGroupMetaDataPtr = Arc<RowGroupMetaData>;
 pub struct RowGroupMetaData {
     columns: Vec<ColumnChunkMetaData>,
     num_rows: i64,
+    sorting_columns: Option<Vec<SortingColumn>>,
     total_byte_size: i64,
     schema_descr: SchemaDescPtr,
     page_offset_index: Option<Vec<Vec<PageLocation>>>,
@@ -260,6 +261,11 @@ impl RowGroupMetaData {
         self.num_rows
     }
 
+    /// Returns the sort ordering of the rows in this RowGroup if any
+    pub fn sorting_columns(&self) -> Option<&Vec<SortingColumn>> {
+        self.sorting_columns.as_ref()
+    }
+
     /// Total byte size of all uncompressed column data in this row group.
     pub fn total_byte_size(&self) -> i64 {
         self.total_byte_size
@@ -303,9 +309,11 @@ impl RowGroupMetaData {
             let cc = ColumnChunkMetaData::from_thrift(d.clone(), c)?;
             columns.push(cc);
         }
+        let sorting_columns = rg.sorting_columns;
         Ok(RowGroupMetaData {
             columns,
             num_rows,
+            sorting_columns,
             total_byte_size,
             schema_descr,
             page_offset_index: None,
@@ -318,7 +326,7 @@ impl RowGroupMetaData {
             columns: self.columns().iter().map(|v| v.to_thrift()).collect(),
             total_byte_size: self.total_byte_size,
             num_rows: self.num_rows,
-            sorting_columns: None,
+            sorting_columns: self.sorting_columns().cloned(),
             file_offset: None,
             total_compressed_size: None,
             ordinal: None,
@@ -331,6 +339,7 @@ pub struct RowGroupMetaDataBuilder {
     columns: Vec<ColumnChunkMetaData>,
     schema_descr: SchemaDescPtr,
     num_rows: i64,
+    sorting_columns: Option<Vec<SortingColumn>>,
     total_byte_size: i64,
     page_offset_index: Option<Vec<Vec<PageLocation>>>,
 }
@@ -342,6 +351,7 @@ impl RowGroupMetaDataBuilder {
             columns: Vec::with_capacity(schema_descr.num_columns()),
             schema_descr,
             num_rows: 0,
+            sorting_columns: None,
             total_byte_size: 0,
             page_offset_index: None,
         }
@@ -353,6 +363,12 @@ impl RowGroupMetaDataBuilder {
         self
     }
 
+    /// Sets the sorting order for columns
+    pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) -> 
Self {
+        self.sorting_columns = value;
+        self
+    }
+
     /// Sets total size in bytes for this row group.
     pub fn set_total_byte_size(mut self, value: i64) -> Self {
         self.total_byte_size = value;
@@ -384,6 +400,7 @@ impl RowGroupMetaDataBuilder {
         Ok(RowGroupMetaData {
             columns: self.columns,
             num_rows: self.num_rows,
+            sorting_columns: self.sorting_columns,
             total_byte_size: self.total_byte_size,
             schema_descr: self.schema_descr,
             page_offset_index: self.page_offset_index,
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index cf821df21..c65ba8035 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -69,6 +69,7 @@ use std::{collections::HashMap, sync::Arc};
 use crate::basic::{Compression, Encoding};
 use crate::compression::{CodecOptions, CodecOptionsBuilder};
 use crate::file::metadata::KeyValue;
+use crate::format::SortingColumn;
 use crate::schema::types::ColumnPath;
 
 const DEFAULT_PAGE_SIZE: usize = 1024 * 1024;
@@ -121,6 +122,7 @@ pub struct WriterProperties {
     pub(crate) key_value_metadata: Option<Vec<KeyValue>>,
     default_column_properties: ColumnProperties,
     column_properties: HashMap<ColumnPath, ColumnProperties>,
+    sorting_columns: Option<Vec<SortingColumn>>,
 }
 
 impl WriterProperties {
@@ -182,6 +184,11 @@ impl WriterProperties {
         self.key_value_metadata.as_ref()
     }
 
+    /// Returns sorting columns.
+    pub fn sorting_columns(&self) -> Option<&Vec<SortingColumn>> {
+        self.sorting_columns.as_ref()
+    }
+
     /// Returns encoding for a data page, when dictionary encoding is enabled.
     /// This is not configurable.
     #[inline]
@@ -262,6 +269,7 @@ pub struct WriterPropertiesBuilder {
     key_value_metadata: Option<Vec<KeyValue>>,
     default_column_properties: ColumnProperties,
     column_properties: HashMap<ColumnPath, ColumnProperties>,
+    sorting_columns: Option<Vec<SortingColumn>>,
 }
 
 impl WriterPropertiesBuilder {
@@ -278,6 +286,7 @@ impl WriterPropertiesBuilder {
             key_value_metadata: None,
             default_column_properties: ColumnProperties::new(),
             column_properties: HashMap::new(),
+            sorting_columns: None,
         }
     }
 
@@ -294,6 +303,7 @@ impl WriterPropertiesBuilder {
             key_value_metadata: self.key_value_metadata,
             default_column_properties: self.default_column_properties,
             column_properties: self.column_properties,
+            sorting_columns: self.sorting_columns,
         }
     }
 
@@ -370,6 +380,12 @@ impl WriterPropertiesBuilder {
         self
     }
 
+    /// Sets sorting order of rows in the row group if any
+    pub fn set_sorting_columns(mut self, value: Option<Vec<SortingColumn>>) -> 
Self {
+        self.sorting_columns = value;
+        self
+    }
+
     // ----------------------------------------------------------------------
     // Setters for any column (global)
 
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 2efaf7caf..b67bdccfe 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -434,6 +434,7 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
                 .set_column_metadata(column_chunks)
                 .set_total_byte_size(self.total_bytes_written as i64)
                 .set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
+                .set_sorting_columns(self.props.sorting_columns().cloned())
                 .build()?;
 
             let metadata = Arc::new(row_group_metadata);
@@ -653,6 +654,7 @@ mod tests {
         reader::{FileReader, SerializedFileReader, SerializedPageReader},
         statistics::{from_thrift, to_thrift, Statistics},
     };
+    use crate::format::SortingColumn;
     use crate::record::RowAccessor;
     use crate::schema::types::{ColumnDescriptor, ColumnPath};
     use crate::util::memory::ByteBufferPtr;
@@ -844,6 +846,65 @@ mod tests {
         assert_eq!(read_field, &field);
     }
 
+    #[test]
+    fn test_file_writer_with_sorting_columns_metadata() {
+        let file = tempfile::tempfile().unwrap();
+
+        let schema = Arc::new(
+            types::Type::group_type_builder("schema")
+                .with_fields(&mut vec![
+                    Arc::new(
+                        types::Type::primitive_type_builder("col1", 
Type::INT32)
+                            .build()
+                            .unwrap(),
+                    ),
+                    Arc::new(
+                        types::Type::primitive_type_builder("col2", 
Type::INT32)
+                            .build()
+                            .unwrap(),
+                    ),
+                ])
+                .build()
+                .unwrap(),
+        );
+        let expected_result = Some(vec![SortingColumn {
+            column_idx: 0,
+            descending: false,
+            nulls_first: true,
+        }]);
+        let props = Arc::new(
+            WriterProperties::builder()
+                .set_key_value_metadata(Some(vec![KeyValue::new(
+                    "key".to_string(),
+                    "value".to_string(),
+                )]))
+                .set_sorting_columns(expected_result.clone())
+                .build(),
+        );
+        let mut writer =
+            SerializedFileWriter::new(file.try_clone().unwrap(), schema, 
props).unwrap();
+        let mut row_group_writer = writer.next_row_group().expect("get row 
group writer");
+
+        let col_writer = row_group_writer.next_column().unwrap().unwrap();
+        col_writer.close().unwrap();
+
+        let col_writer = row_group_writer.next_column().unwrap().unwrap();
+        col_writer.close().unwrap();
+
+        row_group_writer.close().unwrap();
+        writer.close().unwrap();
+
+        let reader = SerializedFileReader::new(file).unwrap();
+        let result: Vec<Option<&Vec<SortingColumn>>> = reader
+            .metadata()
+            .row_groups()
+            .iter()
+            .map(|f| f.sorting_columns())
+            .collect();
+        // validate the sorting column read match the one written above
+        assert_eq!(expected_result.as_ref(), result[0]);
+    }
+
     #[test]
     fn test_file_writer_empty_row_groups() {
         let file = tempfile::tempfile().unwrap();

Reply via email to