etseidl commented on code in PR #9700:
URL: https://github.com/apache/arrow-rs/pull/9700#discussion_r3094917409
##########
parquet/src/data_type.rs:
##########
@@ -57,6 +57,9 @@ const MICROSECONDS_IN_DAY: i64 = SECONDS_IN_DAY *
MICROSECONDS;
const NANOSECONDS_IN_DAY: i64 = SECONDS_IN_DAY * NANOSECONDS;
impl Int96 {
+ /// Size of an INT96 value in bytes.
+ const SIZE_IN_BYTES: usize = 12;
Review Comment:
```suggestion
pub const SIZE_IN_BYTES: usize = std::mem::size_of::<[u32; 3]>();
```
##########
parquet/src/encodings/encoding/plain_counter.rs:
##########
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::private::ParquetValueType;
+use crate::schema::types::ColumnDescriptor;
+
+/// A helper to estimate the size of plain encoding of the values
+/// that were written to the dictionary encoder.
+///
+/// This is used to enhance the dictionary fallback heuristic with the logic
+/// that the writer should fall back to the plain encoding when at a certain
point,
+/// e.g. after encoding the first batch, the total size of unencoded data
+/// is calculated as smaller than `(encodedSize + dictionarySize)`.
+pub struct PlainDataSizeCounter {
+ raw_data_byte_size: usize,
+ // Cached type length to improve performance for fixed-length types.
+ type_length: usize,
+}
+
+impl PlainDataSizeCounter {
+ pub fn new(desc: &ColumnDescriptor) -> Self {
+ Self {
+ raw_data_byte_size: 0,
+ type_length: desc.type_length() as usize,
+ }
+ }
+
+ /// Updates the counter with the given slice.
+ pub fn update<T: ParquetValueType>(&mut self, values: &[T]) {
+ let raw_size = match T::PHYSICAL_TYPE {
+ Type::BOOLEAN => values.len(),
+ Type::INT32 | Type::FLOAT => 4 * values.len(),
+ Type::INT64 | Type::DOUBLE => 8 * values.len(),
+ Type::INT96 => 12 * values.len(),
Review Comment:
```suggestion
Type::INT96 => Int96::SIZE_IN_BYTES * values.len(),
```
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -2572,6 +2572,74 @@ mod tests {
);
}
+ #[test]
+ fn arrow_writer_dictionary_fallback_on_unfavorable_compression() {
+ let schema = Arc::new(Schema::new(vec![Field::new("col",
DataType::Utf8, false)]));
+
+ let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
+
+ // Generate an array of 10 unique 10 character strings.
+ // This results in a dictionary encoding larger than the plain encoded
data,
+ // which should trigger a fallback to PLAIN encoding.
+ for i in 0..10 {
+ let value = i
+ .to_string()
+ .repeat(10)
+ .chars()
+ .take(10)
+ .collect::<String>();
+
+ builder.append_value(value);
+ }
+
+ let array = Arc::new(builder.finish());
+
+ let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
+
+ let file = tempfile::tempfile().unwrap();
+
+ // Set dictionary fallback to trigger fallback to PLAIN encoding on
unfavorable compression
+ let props = WriterProperties::builder()
+ .set_dictionary_fallback(DictionaryFallback::OnUnfavorableAfter(1))
+ .set_data_page_size_limit(1)
Review Comment:
```suggestion
.set_data_page_row_count_limit(2)
```
There's an issue here due to the fact that with these settings the page is
flushed before the check for dict fallback is called. Keeping the batch size at
1 but the row count at 2 should allow the check to actually force fallback,
resulting in one RLE_DICTIONARY encoded data page and 5 PLAIN encoded.
##########
parquet/src/data_type.rs:
##########
@@ -721,10 +724,8 @@ pub(crate) mod private {
fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) ->
Result<usize>;
- /// Return the encoded size for a type
- fn dict_encoding_size(&self) -> (usize, usize) {
- (std::mem::size_of::<Self>(), 1)
- }
+ /// Return the size in bytes for the value encoded in the dictionary.
+ fn dict_encoding_size(&self) -> usize;
Review Comment:
I never really paid attention to this before, but it's a curious name for
this function. Since the dictionary page is `PLAIN` encoded, this is really the
plain encoded size (and you actually use it as such in the counter). No need to
change the name, but perhaps the docstring could explain this.
##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -510,6 +524,11 @@ impl ColumnValueEncoder for ByteArrayEncoder {
Some(self.dict_encoder.as_ref()?.estimated_dict_page_size())
}
+ fn plain_encoded_data_size(&self) -> Option<usize> {
+ let counter = self.plain_data_size_counter.as_ref()?;
+ Some(counter.plain_encoded_data_size())
Review Comment:
```suggestion
Some(self.plain_data_size_counter.as_ref()?.plain_encoded_data_size())
```
##########
parquet/src/column/writer/encoder.rs:
##########
@@ -277,6 +300,11 @@ impl<T: DataType> ColumnValueEncoder for
ColumnValueEncoderImpl<T> {
Some(self.dict_encoder.as_ref()?.dict_encoded_size())
}
+ fn plain_encoded_data_size(&self) -> Option<usize> {
+ let counter = self.plain_data_size_counter.as_ref()?;
+ Some(counter.plain_encoded_data_size())
Review Comment:
```suggestion
Some(self.plain_data_size_counter.as_ref()?.plain_encoded_data_size())
```
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -2572,6 +2572,74 @@ mod tests {
);
}
+ #[test]
+ fn arrow_writer_dictionary_fallback_on_unfavorable_compression() {
+ let schema = Arc::new(Schema::new(vec![Field::new("col",
DataType::Utf8, false)]));
+
+ let mut builder = StringBuilder::with_capacity(100, 329 * 10_000);
+
+ // Generate an array of 10 unique 10 character strings.
+ // This results in a dictionary encoding larger than the plain encoded
data,
+ // which should trigger a fallback to PLAIN encoding.
+ for i in 0..10 {
+ let value = i
+ .to_string()
+ .repeat(10)
+ .chars()
+ .take(10)
+ .collect::<String>();
+
+ builder.append_value(value);
+ }
+
+ let array = Arc::new(builder.finish());
+
+ let batch = RecordBatch::try_new(schema, vec![array]).unwrap();
+
+ let file = tempfile::tempfile().unwrap();
+
+ // Set dictionary fallback to trigger fallback to PLAIN encoding on
unfavorable compression
+ let props = WriterProperties::builder()
+ .set_dictionary_fallback(DictionaryFallback::OnUnfavorableAfter(1))
+ .set_data_page_size_limit(1)
+ .set_write_batch_size(1)
+ .build();
+
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(),
Some(props))
+ .expect("Unable to write file");
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let options = ReadOptionsBuilder::new().with_page_index().build();
+ let reader =
+ SerializedFileReader::new_with_options(file.try_clone().unwrap(),
options).unwrap();
+
+ let column = reader.metadata().row_group(0).columns();
+
+ assert_eq!(column.len(), 1);
+
+ // We should write one row before falling back to PLAIN encoding so
there should still be a
+ // dictionary page.
+ assert!(
+ column[0].dictionary_page_offset().is_some(),
+ "Expected a dictionary page"
+ );
+
+ assert!(reader.metadata().offset_index().is_some());
Review Comment:
The following is not testing the encoding, merely counting the number of
data pages. Rather than this you should be examining the page encoding stats.
```rust
let options = ReadOptionsBuilder::new()
.with_encoding_stats_as_mask(false)
.build();
...
// check page encoding stats, should be one dict page, one dict
encoded page, and 9
// plain encoded pages
let stats = column[0].page_encoding_stats().unwrap();
println!("pes: {stats:?}");
assert!(
stats
.iter()
.any(|s| s.page_type == PageType::DICTIONARY_PAGE)
);
let num_dict_encoded: i32 = stats
.iter()
.filter(|s| {
s.page_type == PageType::DATA_PAGE && s.encoding ==
Encoding::RLE_DICTIONARY
})
.map(|s| s.count)
.sum();
assert_eq!(num_dict_encoded, 1);
let num_plain_encoded: i32 = stats
.iter()
.filter(|s| {
s.page_type == PageType::DATA_PAGE && s.encoding ==
Encoding::PLAIN
})
.map(|s| s.count)
.sum();
assert_eq!(num_plain_encoded, 9);
```
Coded this way, the test fails with
```
thread
'arrow::arrow_writer::tests::arrow_writer_dictionary_fallback_on_unfavorable_compression'
(10294973) panicked at parquet/src/arrow/arrow_writer/mod.rs:2649:9:
assertion `left == right` failed
left: 10
right: 1
```
indicating that all pages are dict encoded and fallback did not occur
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]