alamb commented on code in PR #5486:
URL: https://github.com/apache/arrow-rs/pull/5486#discussion_r1673919034
##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -269,12 +274,16 @@ impl FallbackEncoder {
}
};
+ let var_bytes = Some(self.variable_length_bytes);
Review Comment:
```suggestion
let variable_length_bytes = Some(self.variable_length_bytes);
```
Might be more consistent with the rest of the code. The same comment applies
to several other uses below
##########
parquet/src/column/writer/mod.rs:
##########
@@ -275,6 +293,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
num_buffered_values: 0,
num_buffered_rows: 0,
num_page_nulls: 0,
+ repetition_level_histogram: new_histogram_vec(max_rep_level),
Review Comment:
Reading through the code in this PR, I think it is hard for me to convince
myself that the fields in PageMetrics are set / updated appropriately. Part of
this is because the modifications are happening inline rather than, for
example, some encapsulated methods of `PageMetrics`
I realize the this PR follows the existing pattern of modifing the fields of
PageMetrics directly, but I think the new code is sufficiently complex that it
is worth more encapsulation
For example, I would find it easier to reason about this code with something
like
```rust
let mut page_metrics = PageMetrics::new();
// Collect histograms if there is more than a single level and if
// page or chunk statistics are being collected
if statistics_enabled == EnabledStatistics::None || max_level == 0 {
page_metrics = page_metrics
.with_repetition_level_histograms(max_rep_level)
.with_definition_level_histograms(max_def_level)
}
...
Self {
descr,
props,
...
page_metrics,
...
}
```
Likewise then below rather than inlining the update of `
repetition_level_histogram`
```rust
if let Some(ref mut def_hist) =
self.page_metrics.definition_level_histogram {
// Count values and update histogram
for &level in levels {
process_def_level(level);
def_hist[level as usize] += 1;
}
} else {
// Count values
for &level in levels {
process_def_level(level);
}
}
```
It could be encapsulated into something like
```rust
self.page_metrics.update_definition_level_histogram(levels);
```
##########
parquet/src/file/metadata/mod.rs:
##########
@@ -690,6 +708,26 @@ impl ColumnChunkMetaData {
Some(offset..(offset + length))
}
+ /// Returns the number of bytes of variable length data after decoding.
+ /// Only set for BYTE_ARRAY columns.
Review Comment:
I think it would be good to mention here that only some writers set this
field
##########
parquet/src/file/metadata/mod.rs:
##########
@@ -538,6 +553,9 @@ pub struct ColumnChunkMetaData {
offset_index_length: Option<i32>,
column_index_offset: Option<i64>,
column_index_length: Option<i32>,
+ unencoded_byte_array_data_bytes: Option<i64>,
Review Comment:
can we please add these fields to the HeapSize tracking:
https://github.com/apache/arrow-rs/blob/e61fb621d1723b768689092d00cbce38ff8dc813/parquet/src/file/metadata/memory.rs#L95-L94
(which I think was added after you original wrote this PR)?
##########
parquet/src/column/writer/mod.rs:
##########
@@ -529,12 +552,26 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
})?;
let mut values_to_write = 0;
- for &level in levels {
+
+ let mut process_def_level = |level| {
if level == self.descr.max_def_level() {
values_to_write += 1;
} else {
// We must always compute this as it is used to populate
v2 pages
- self.page_metrics.num_page_nulls += 1
+ self.page_metrics.num_page_nulls += 1;
+ }
+ };
+
+ if let Some(ref mut def_hist) =
self.page_metrics.definition_level_histogram {
Review Comment:
the same comment applies here about encapsulating the updates of page
metrics -- I think it would reduce the repetition sigificantly
##########
parquet/src/column/writer/mod.rs:
##########
@@ -887,7 +989,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
// Reset state.
self.rep_levels_sink.clear();
self.def_levels_sink.clear();
- self.page_metrics = PageMetrics::default();
+
+ // don't clobber histogram vectors
+ self.page_metrics.num_buffered_values = 0;
Review Comment:
I think the previous formulation was easier to reason about (reset page
metrics)
##########
parquet/src/column/writer/mod.rs:
##########
@@ -782,8 +843,49 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
_ => None,
};
+ if let Some(var_bytes) = values_data.variable_length_bytes {
+ *self.column_metrics.variable_length_bytes.get_or_insert(0) +=
var_bytes;
+ }
+
// update column and offset index
- self.update_column_offset_index(page_statistics.as_ref());
+ self.update_column_offset_index(
+ page_statistics.as_ref(),
+ values_data.variable_length_bytes,
+ );
+
+ // collect page histograms into chunk histograms and zero out page
histograms
+ // TODO(ets): This could instead just add the vectors, and then allow
page_metrics to be reset
+ // below. Would then need to recreate the histogram vectors, so
`new_histogram_vec` above
+ // would need to become a function.
+ if let Some(ref mut page_hist) =
self.page_metrics.repetition_level_histogram {
Review Comment:
If you followed the suggestion above to move more of this logic into
`PageMetrics` maybe thie code could look like
```rust
self.column_metrics.update_from_page(&self.page_metrics);
// reset metrics for a new page (zeros out buffered rows, resets histogram
counts to 0)
self.page_metrics.new_page()
...
```
##########
parquet/src/file/page_index/index.rs:
##########
@@ -41,6 +41,10 @@ pub struct PageIndex<T> {
pub max: Option<T>,
/// Null values in the page
pub null_count: Option<i64>,
+ /// Repetition level histogram for the page
Review Comment:
Could we also note here that each element is the count of the repetition /
definition level?
##########
parquet/src/format.rs:
##########
@@ -634,6 +634,143 @@ impl From<&BoundaryOrder> for i32 {
}
}
+//
Review Comment:
How was this re-generated? Do we have to update a pin to parquet-format
anywhere?
##########
parquet/src/file/writer.rs:
##########
@@ -1828,4 +1836,235 @@ mod tests {
let b_idx = &column_index[0][1];
assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
}
+
+ #[test]
+ fn test_byte_array_size_statistics() {
+ let message_type = "
+ message test_schema {
+ OPTIONAL BYTE_ARRAY a (UTF8);
+ }
+ ";
+ let schema = Arc::new(parse_message_type(message_type).unwrap());
+ let data = ByteArrayType::gen_vec(32, 7);
+ let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
+ let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
+ let file: File = tempfile::tempfile().unwrap();
+ let props = Arc::new(
+ WriterProperties::builder()
+ .set_statistics_enabled(EnabledStatistics::Page)
+ .build(),
+ );
+
+ let mut writer = SerializedFileWriter::new(&file, schema,
props).unwrap();
+ let mut row_group_writer = writer.next_row_group().unwrap();
+
+ let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
+ col_writer
+ .typed::<ByteArrayType>()
+ .write_batch(&data, Some(&def_levels), None)
+ .unwrap();
+ col_writer.close().unwrap();
+ row_group_writer.close().unwrap();
+ let file_metadata = writer.close().unwrap();
+
+ assert_eq!(file_metadata.row_groups.len(), 1);
+ assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
+ assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+
+ let check_def_hist = |def_hist: &Vec<i64>| {
+ assert_eq!(def_hist.len(), 2);
+ assert_eq!(def_hist[0], 3);
+ assert_eq!(def_hist[1], 7);
+ };
+
+ if let Some(ref meta_data) =
file_metadata.row_groups[0].columns[0].meta_data {
Review Comment:
I think the test should asset if this field is actually set -- there
shouldn't be any non determinism
Like for example
```rust
let Some(meta_data) =
file_metadata.row_groups[0].columns[0].meta_data.as_ref().unwrap())
```
##########
parquet/src/file/page_index/index_reader.rs:
##########
@@ -109,6 +110,48 @@ pub fn read_pages_locations<R: ChunkReader>(
.collect()
}
+/// Reads per-column [`ParquetOffsetIndex`] for all columns of a row group by
+/// decoding [`OffsetIndex`] .
+///
+/// Returns a vector of `index[column_number]`.
+///
+/// Returns an empty vector if this row group does not contain an
+/// [`OffsetIndex`].
+///
+/// See [Offset Index Documentation] for more details.
Review Comment:
I think this should be `Page Index Documentation`
I find the naming of the structs in parquet-rs quite confusing here (the
parquet "PageIndex" is comprised of OffsetIndex and page index)
##########
parquet/src/file/page_index/offset_index.rs:
##########
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`ParquetOffsetIndex`] structure holding decoded [`OffsetIndex`]
information
+
+use crate::errors::ParquetError;
+use crate::format::{OffsetIndex, PageLocation};
+
+/// [`OffsetIndex`] information for a column chunk. Contains offsets and sizes
for each page
+/// in the chunk. Optionally stores fully decoded page sizes for BYTE_ARRAY
columns.
+#[derive(Debug, Clone, PartialEq)]
+pub struct ParquetOffsetIndex {
Review Comment:
This is an excellent change (to make ParquetOffsetIndex an *actual*
structure rather than a typedef).
I am surprised that the other implementation of
https://github.com/apache/arrow-rs/blob/a008e9ed6d8cb6667af3ee40d4b3880de2c5c9ee/parquet/src/file/metadata/mod.rs#L72-L71
doesn't seem to be removed in this PR 🤔
I have actually wanted to give `ParquetColumnIndex` the same treatment as
well
##########
parquet/src/file/writer.rs:
##########
@@ -1828,4 +1836,235 @@ mod tests {
let b_idx = &column_index[0][1];
assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
}
+
+ #[test]
+ fn test_byte_array_size_statistics() {
+ let message_type = "
+ message test_schema {
+ OPTIONAL BYTE_ARRAY a (UTF8);
+ }
+ ";
+ let schema = Arc::new(parse_message_type(message_type).unwrap());
+ let data = ByteArrayType::gen_vec(32, 7);
+ let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
+ let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum();
+ let file: File = tempfile::tempfile().unwrap();
+ let props = Arc::new(
+ WriterProperties::builder()
+ .set_statistics_enabled(EnabledStatistics::Page)
+ .build(),
+ );
+
+ let mut writer = SerializedFileWriter::new(&file, schema,
props).unwrap();
+ let mut row_group_writer = writer.next_row_group().unwrap();
+
+ let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
+ col_writer
+ .typed::<ByteArrayType>()
+ .write_batch(&data, Some(&def_levels), None)
+ .unwrap();
+ col_writer.close().unwrap();
+ row_group_writer.close().unwrap();
+ let file_metadata = writer.close().unwrap();
+
+ assert_eq!(file_metadata.row_groups.len(), 1);
+ assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
+ assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+
+ let check_def_hist = |def_hist: &Vec<i64>| {
+ assert_eq!(def_hist.len(), 2);
+ assert_eq!(def_hist[0], 3);
+ assert_eq!(def_hist[1], 7);
+ };
+
+ if let Some(ref meta_data) =
file_metadata.row_groups[0].columns[0].meta_data {
Review Comment:
same thing below
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]