alamb commented on code in PR #5486:
URL: https://github.com/apache/arrow-rs/pull/5486#discussion_r1673919034


##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -269,12 +274,16 @@ impl FallbackEncoder {
             }
         };
 
+        let var_bytes = Some(self.variable_length_bytes);

Review Comment:
   ```suggestion
           let variable_length_bytes = Some(self.variable_length_bytes);
   ```
   
   Might be more consistent with the rest of the code. The same comment applies 
to several other uses below



##########
parquet/src/column/writer/mod.rs:
##########
@@ -275,6 +293,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
                 num_buffered_values: 0,
                 num_buffered_rows: 0,
                 num_page_nulls: 0,
+                repetition_level_histogram: new_histogram_vec(max_rep_level),

Review Comment:
   Reading through the code in this PR, I think it is hard for me to convince 
myself that the fields in PageMetrics are set / updated appropriately. Part of 
this is because the modifications are happening inline rather than, for 
example, some encapsulated methods of `PageMetrics`
   
   I realize the this PR follows the existing pattern of modifing the fields of 
PageMetrics directly, but I think the new code is sufficiently complex that it 
is worth more encapsulation
   
   For example, I would find it easier to reason about this code with something 
like
   
   ```rust
   let mut page_metrics = PageMetrics::new();
   // Collect histograms if there is more than a single level and if
   // page or chunk statistics are being collected
   if statistics_enabled == EnabledStatistics::None || max_level == 0 {
     page_metrics = page_metrics
       .with_repetition_level_histograms(max_rep_level)
       .with_definition_level_histograms(max_def_level)
   }
   ...
   
     Self {
               descr,
               props,
   ...
               page_metrics,
   ...
   }
   ```
   
   Likewise then below rather than inlining the update of ` 
repetition_level_histogram`
   
   ```rust
        if let Some(ref mut def_hist) = 
self.page_metrics.definition_level_histogram {
                   // Count values and update histogram
                   for &level in levels {
                       process_def_level(level);
                       def_hist[level as usize] += 1;
                   }
               } else {
                   // Count values
                   for &level in levels {
                       process_def_level(level);
                   }
               }
   ```
   
   It could be encapsulated into something like
   
   ```rust
        self.page_metrics.update_definition_level_histogram(levels);
   ```
   
   



##########
parquet/src/file/metadata/mod.rs:
##########
@@ -690,6 +708,26 @@ impl ColumnChunkMetaData {
         Some(offset..(offset + length))
     }
 
+    /// Returns the number of bytes of variable length data after decoding.
+    /// Only set for BYTE_ARRAY columns.

Review Comment:
   I think it would be good to mention here that only some writers set this 
field



##########
parquet/src/file/metadata/mod.rs:
##########
@@ -538,6 +553,9 @@ pub struct ColumnChunkMetaData {
     offset_index_length: Option<i32>,
     column_index_offset: Option<i64>,
     column_index_length: Option<i32>,
+    unencoded_byte_array_data_bytes: Option<i64>,

Review Comment:
   can we please add these fields to the HeapSize tracking: 
https://github.com/apache/arrow-rs/blob/e61fb621d1723b768689092d00cbce38ff8dc813/parquet/src/file/metadata/memory.rs#L95-L94
 (which I think was added after you original wrote this PR)?



##########
parquet/src/column/writer/mod.rs:
##########
@@ -529,12 +552,26 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
             })?;
 
             let mut values_to_write = 0;
-            for &level in levels {
+
+            let mut process_def_level = |level| {
                 if level == self.descr.max_def_level() {
                     values_to_write += 1;
                 } else {
                     // We must always compute this as it is used to populate 
v2 pages
-                    self.page_metrics.num_page_nulls += 1
+                    self.page_metrics.num_page_nulls += 1;
+                }
+            };
+
+            if let Some(ref mut def_hist) = 
self.page_metrics.definition_level_histogram {

Review Comment:
   the same comment applies here about encapsulating the updates of page 
metrics -- I think it would reduce the repetition sigificantly



##########
parquet/src/column/writer/mod.rs:
##########
@@ -887,7 +989,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
         // Reset state.
         self.rep_levels_sink.clear();
         self.def_levels_sink.clear();
-        self.page_metrics = PageMetrics::default();
+
+        // don't clobber histogram vectors
+        self.page_metrics.num_buffered_values = 0;

Review Comment:
   I think the previous formulation was easier to reason about (reset page 
metrics)
   
   



##########
parquet/src/column/writer/mod.rs:
##########
@@ -782,8 +843,49 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
             _ => None,
         };
 
+        if let Some(var_bytes) = values_data.variable_length_bytes {
+            *self.column_metrics.variable_length_bytes.get_or_insert(0) += 
var_bytes;
+        }
+
         // update column and offset index
-        self.update_column_offset_index(page_statistics.as_ref());
+        self.update_column_offset_index(
+            page_statistics.as_ref(),
+            values_data.variable_length_bytes,
+        );
+
+        // collect page histograms into chunk histograms and zero out page 
histograms
+        // TODO(ets): This could instead just add the vectors, and then allow 
page_metrics to be reset
+        // below. Would then need to recreate the histogram vectors, so 
`new_histogram_vec` above
+        // would need to become a function.
+        if let Some(ref mut page_hist) = 
self.page_metrics.repetition_level_histogram {

Review Comment:
   If you followed the suggestion above to move more of this logic into 
`PageMetrics` maybe thie code could look like
   
   ```rust
   self.column_metrics.update_from_page(&self.page_metrics);
   // reset metrics for a new page (zeros out buffered rows, resets histogram 
counts to 0)
   self.page_metrics.new_page()
   ...
   ```
   



##########
parquet/src/file/page_index/index.rs:
##########
@@ -41,6 +41,10 @@ pub struct PageIndex<T> {
     pub max: Option<T>,
     /// Null values in the page
     pub null_count: Option<i64>,
+    /// Repetition level histogram for the page

Review Comment:
   Could we also note here that each element is the count of the repetition / 
definition level?



##########
parquet/src/format.rs:
##########
@@ -634,6 +634,143 @@ impl From<&BoundaryOrder> for i32 {
   }
 }
 
+//

Review Comment:
   How was this re-generated? Do we have to update a pin to parquet-format 
anywhere?



##########
parquet/src/file/writer.rs:
##########
@@ -1828,4 +1836,235 @@ mod tests {
         let b_idx = &column_index[0][1];
         assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
     }
+
+    #[test]
+    fn test_byte_array_size_statistics() {
+        let message_type = "
+            message test_schema {
+                OPTIONAL BYTE_ARRAY a (UTF8);
+            }
+        ";
+        let schema = Arc::new(parse_message_type(message_type).unwrap());
+        let data = ByteArrayType::gen_vec(32, 7);
+        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
+        let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
+        let file: File = tempfile::tempfile().unwrap();
+        let props = Arc::new(
+            WriterProperties::builder()
+                .set_statistics_enabled(EnabledStatistics::Page)
+                .build(),
+        );
+
+        let mut writer = SerializedFileWriter::new(&file, schema, 
props).unwrap();
+        let mut row_group_writer = writer.next_row_group().unwrap();
+
+        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
+        col_writer
+            .typed::<ByteArrayType>()
+            .write_batch(&data, Some(&def_levels), None)
+            .unwrap();
+        col_writer.close().unwrap();
+        row_group_writer.close().unwrap();
+        let file_metadata = writer.close().unwrap();
+
+        assert_eq!(file_metadata.row_groups.len(), 1);
+        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
+        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+
+        let check_def_hist = |def_hist: &Vec<i64>| {
+            assert_eq!(def_hist.len(), 2);
+            assert_eq!(def_hist[0], 3);
+            assert_eq!(def_hist[1], 7);
+        };
+
+        if let Some(ref meta_data) = 
file_metadata.row_groups[0].columns[0].meta_data {

Review Comment:
   I think the test should asset if this field is actually set -- there 
shouldn't be any non determinism
   
   Like for example 
   
   ```rust
   let Some(meta_data) = 
file_metadata.row_groups[0].columns[0].meta_data.as_ref().unwrap())
   ```



##########
parquet/src/file/page_index/index_reader.rs:
##########
@@ -109,6 +110,48 @@ pub fn read_pages_locations<R: ChunkReader>(
         .collect()
 }
 
+/// Reads per-column [`ParquetOffsetIndex`] for all columns of a row group by
+/// decoding [`OffsetIndex`] .
+///
+/// Returns a vector of `index[column_number]`.
+///
+/// Returns an empty vector if this row group does not contain an
+/// [`OffsetIndex`].
+///
+/// See [Offset Index Documentation] for more details.

Review Comment:
   I think this should be `Page Index Documentation`
   
   I find the naming of the structs in parquet-rs quite confusing here (the 
parquet "PageIndex" is comprised of OffsetIndex and page index)



##########
parquet/src/file/page_index/offset_index.rs:
##########
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetOffsetIndex`] structure holding decoded [`OffsetIndex`] 
information
+
+use crate::errors::ParquetError;
+use crate::format::{OffsetIndex, PageLocation};
+
+/// [`OffsetIndex`] information for a column chunk. Contains offsets and sizes 
for each page
+/// in the chunk. Optionally stores fully decoded page sizes for BYTE_ARRAY 
columns.
+#[derive(Debug, Clone, PartialEq)]
+pub struct ParquetOffsetIndex {

Review Comment:
   This is an excellent change (to make ParquetOffsetIndex an *actual* 
structure rather than a typedef). 
   
   I am surprised that the other implementation of 
https://github.com/apache/arrow-rs/blob/a008e9ed6d8cb6667af3ee40d4b3880de2c5c9ee/parquet/src/file/metadata/mod.rs#L72-L71
 doesn't seem to be removed in this PR 🤔 
   
   I have actually wanted to give `ParquetColumnIndex`  the same treatment as 
well
   



##########
parquet/src/file/writer.rs:
##########
@@ -1828,4 +1836,235 @@ mod tests {
         let b_idx = &column_index[0][1];
         assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
     }
+
+    #[test]
+    fn test_byte_array_size_statistics() {
+        let message_type = "
+            message test_schema {
+                OPTIONAL BYTE_ARRAY a (UTF8);
+            }
+        ";
+        let schema = Arc::new(parse_message_type(message_type).unwrap());
+        let data = ByteArrayType::gen_vec(32, 7);
+        let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
+        let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
+        let file: File = tempfile::tempfile().unwrap();
+        let props = Arc::new(
+            WriterProperties::builder()
+                .set_statistics_enabled(EnabledStatistics::Page)
+                .build(),
+        );
+
+        let mut writer = SerializedFileWriter::new(&file, schema, 
props).unwrap();
+        let mut row_group_writer = writer.next_row_group().unwrap();
+
+        let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
+        col_writer
+            .typed::<ByteArrayType>()
+            .write_batch(&data, Some(&def_levels), None)
+            .unwrap();
+        col_writer.close().unwrap();
+        row_group_writer.close().unwrap();
+        let file_metadata = writer.close().unwrap();
+
+        assert_eq!(file_metadata.row_groups.len(), 1);
+        assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
+        assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+
+        let check_def_hist = |def_hist: &Vec<i64>| {
+            assert_eq!(def_hist.len(), 2);
+            assert_eq!(def_hist[0], 3);
+            assert_eq!(def_hist[1], 7);
+        };
+
+        if let Some(ref meta_data) = 
file_metadata.row_groups[0].columns[0].meta_data {

Review Comment:
   same thing below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to