tustvold commented on code in PR #4389:
URL: https://github.com/apache/arrow-rs/pull/4389#discussion_r1224334213
##########
parquet/src/file/properties.rs:
##########
@@ -626,6 +635,13 @@ impl WriterPropertiesBuilder {
self.get_mut_props(col).set_bloom_filter_ndv(value);
self
}
+
+ /// Sets the max length of min/max value fields in the column index.
+ /// If set to `None` - there's no effective limit.
+ pub fn set_value_truncate_length(mut self, max_length: Option<usize>) ->
Self {
Review Comment:
```suggestion
pub fn set_column_index_truncate_length(mut self, max_length:
Option<usize>) -> Self {
```
To match parquet-mr -
https://github.com/apache/parquet-mr/pull/481/files#diff-8df991c65afc50cdc9e9b1cd5d2c0dd2ba84af1096c5503a6af7303df4b61b33R310
##########
parquet/src/column/writer/mod.rs:
##########
@@ -1152,6 +1181,56 @@ fn compare_greater_byte_array_decimals(a: &[u8], b:
&[u8]) -> bool {
(a[1..]) > (b[1..])
}
+/// Truncate a UTF8 slice to the longest prefix that is still a valid UTF8
string, while being less than `max_len` bytes.
+fn truncate_utf8(data: &str, max_len: usize) -> Vec<u8> {
+ let mut max_possible_len = usize::min(data.len(), max_len);
+
+ if data.is_char_boundary(max_possible_len) {
+ return data.as_bytes()[0..max_possible_len].to_vec();
+ }
+
+ // UTF8 characters can only be up to 4 bytes long, so this loop has will
only run up to 3 times before returning.
+ loop {
+ max_possible_len -= 1;
+ if data.is_char_boundary(max_possible_len) {
+ return data.as_bytes()[0..max_possible_len].to_vec();
+ }
+ }
Review Comment:
FWIW you could do something like `data.char_indices().next_back()` or
something
##########
parquet/src/file/metadata.rs:
##########
@@ -868,13 +868,13 @@ impl ColumnIndexBuilder {
pub fn append(
&mut self,
null_page: bool,
- min_value: &[u8],
- max_value: &[u8],
+ min_value: Vec<u8>,
Review Comment:
This an API change, which is fine
##########
parquet/src/column/writer/mod.rs:
##########
@@ -1152,6 +1181,56 @@ fn compare_greater_byte_array_decimals(a: &[u8], b:
&[u8]) -> bool {
(a[1..]) > (b[1..])
}
+/// Truncate a UTF8 slice to the longest prefix that is still a valid UTF8
string, while being less than `max_len` bytes.
+fn truncate_utf8(data: &str, max_len: usize) -> Vec<u8> {
+ let mut max_possible_len = usize::min(data.len(), max_len);
Review Comment:
```suggestion
let mut max_possible_len = data.len().min(max_len);
```
##########
parquet/src/column/writer/mod.rs:
##########
@@ -2220,12 +2300,182 @@ mod tests {
);
}
+ /// Verify min/max value truncation in the column index works as expected
+ #[test]
+ fn test_column_offset_index_metadata_truncating() {
+ // write data
+ // and check the offset index and column index
+ let page_writer = get_test_page_writer();
+ let props = Default::default();
+ let mut writer =
+ get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0,
props);
+
+ let mut data = vec![FixedLenByteArray::default(); 3];
+ // This is the expected min value
+ data[0].set_data(ByteBufferPtr::new(vec![0_u8; 200]));
+ // This is the expected max value
+ data[1].set_data(ByteBufferPtr::new(
+ String::from("Zorro the masked hero").into_bytes(),
+ ));
+ data[2].set_data(ByteBufferPtr::new(Vec::from_iter(0_u8..129)));
+
+ writer.write_batch(&data, None, None).unwrap();
+
+ writer.flush_data_pages().unwrap();
+
+ let r = writer.close().unwrap();
+ let column_index = r.column_index.unwrap();
+ let offset_index = r.offset_index.unwrap();
+
+ assert_eq!(3, r.rows_written);
+
+ // column index
+ assert_eq!(1, column_index.null_pages.len());
+ assert_eq!(1, offset_index.page_locations.len());
+ assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
+ assert!(!column_index.null_pages[0]);
+ assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
+
+ if let Some(stats) = r.metadata.statistics() {
+ assert!(stats.has_min_max_set());
+ assert_eq!(stats.null_count(), 0);
+ assert_eq!(stats.distinct_count(), None);
+ if let Statistics::FixedLenByteArray(stats) = stats {
+ let column_index_min_value =
column_index.min_values.get(0).unwrap();
+ let column_index_max_value =
column_index.max_values.get(0).unwrap();
+
+ // Column index stats are truncated, while the column chunk's
aren't.
+ assert_ne!(stats.min_bytes(),
column_index_min_value.as_slice());
+ // We expect the max value to be incremented
+ let mut stats_max_bytes = stats.max_bytes().to_vec();
+ increment(&mut stats_max_bytes);
+ assert_eq!(stats_max_bytes, column_index_max_value.as_slice());
+
+ assert_eq!(column_index_min_value.len(), 128);
+ assert_eq!(column_index_min_value.as_slice(), &[0_u8; 128]);
+ assert_eq!(column_index_max_value.len(), 21);
+ } else {
+ panic!("expecting Statistics::FixedLenByteArray");
+ }
+ } else {
+ panic!("metadata missing statistics");
+ }
+ }
+
+ #[test]
+ fn test_column_offset_index_metadata_truncating_spec_example() {
+ // write data
+ // and check the offset index and column index
+ let page_writer = get_test_page_writer();
+ let builder =
WriterProperties::builder().set_value_truncate_length(Some(1));
+ let props = Arc::new(builder.build());
+ let mut writer =
+ get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0,
props);
+
+ let mut data = vec![FixedLenByteArray::default(); 1];
+ // This is the expected min value
+ data[0].set_data(ByteBufferPtr::new(
+ String::from("Blart Versenwald III").into_bytes(),
+ ));
+
+ writer.write_batch(&data, None, None).unwrap();
+
+ writer.flush_data_pages().unwrap();
+
+ let r = writer.close().unwrap();
+ let column_index = r.column_index.unwrap();
+ let offset_index = r.offset_index.unwrap();
+
+ assert_eq!(1, r.rows_written);
+
+ // column index
+ assert_eq!(1, column_index.null_pages.len());
+ assert_eq!(1, offset_index.page_locations.len());
+ assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
+ assert!(!column_index.null_pages[0]);
+ assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
+
+ if let Some(stats) = r.metadata.statistics() {
+ assert!(stats.has_min_max_set());
+ assert_eq!(stats.null_count(), 0);
+ assert_eq!(stats.distinct_count(), None);
+ if let Statistics::FixedLenByteArray(_stats) = stats {
+ let column_index_min_value =
column_index.min_values.get(0).unwrap();
+ let column_index_max_value =
column_index.max_values.get(0).unwrap();
+
+ assert_ne!(column_index_min_value, column_index_max_value);
+
+ assert_eq!(column_index_min_value.len(), 1);
+ assert_eq!(column_index_max_value.len(), 1);
+
+ // Column index stats are truncated, while the column chunk's
aren't.
+ assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
+ // In this case, they are equal because the max value is
shorter than the default threshold
+ assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
+ } else {
+ panic!("expecting Statistics::FixedLenByteArray");
+ }
+ } else {
+ panic!("metadata missing statistics");
+ }
+ }
+
#[test]
fn test_send() {
fn test<T: Send>() {}
test::<ColumnWriterImpl<Int32Type>>();
}
+ #[test]
+ fn test_increment() {
+ let mut v = vec![0, 0, 0];
+ increment(&mut v);
+ assert_eq!(&v, &[0, 0, 1]);
+
+ // Handle overflow
+ let mut v = vec![0, 255, 255];
+ increment(&mut v);
+ assert_eq!(&v, &[1, 255, 255]);
+
+ // No-op if all bytes are u8::MAX
+ let mut v = vec![255, 255, 255];
+ increment(&mut v);
+ assert_eq!(&v, &[255, 255, 255]);
+ }
+
+ #[test]
+ fn test_increment_utf8() {
+ // Basic ASCII case
+ let mut v = "hello".as_bytes().to_vec();
+ increment_utf8(&mut v);
+ assert_eq!(&v, "hellp".as_bytes());
+
+ // Utf8 string
+ let s = "โค๏ธ๐งก๐๐๐๐";
+ let mut v = s.as_bytes().to_vec();
+ increment_utf8(&mut v);
+ if let Ok(new) = String::from_utf8(v) {
Review Comment:
Could we also test that it is also larger
##########
parquet/src/column/writer/mod.rs:
##########
@@ -683,6 +683,35 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
.append_row_count(self.page_metrics.num_buffered_rows as i64);
}
+ fn truncate_min_value(&self, data: &[u8]) -> Vec<u8> {
+ let max_effective_len =
+ self.props.minmax_value_truncate_len().unwrap_or(data.len());
+
+ match std::str::from_utf8(data) {
+ Ok(str_data) => truncate_utf8(str_data, max_effective_len),
+ Err(_) => truncate_binary(data, max_effective_len),
+ }
+ }
+
+ fn truncate_max_value(&self, data: &[u8]) -> Vec<u8> {
+ // Even if the user disables value truncation, we want to make sure to
increase the max value so the user doesn't miss it.
Review Comment:
I think we should only increment if we are truncating, I think it would be a
bit surprising for users to write `b"hello"` and get back `b"hellp"`. Whereas
writing `"really long string"` and getting back an obviously truncated string
is perhaps more understandable `"really long su"`
##########
parquet/src/column/writer/mod.rs:
##########
@@ -1152,6 +1181,56 @@ fn compare_greater_byte_array_decimals(a: &[u8], b:
&[u8]) -> bool {
(a[1..]) > (b[1..])
}
+/// Truncate a UTF8 slice to the longest prefix that is still a valid UTF8
string, while being less than `max_len` bytes.
+fn truncate_utf8(data: &str, max_len: usize) -> Vec<u8> {
Review Comment:
```suggestion
fn truncate_utf8(data: &str, max_len: usize) -> &str {
```
Might allow preserving the UTF-8ness
##########
parquet/src/file/properties.rs:
##########
@@ -81,6 +81,7 @@ const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize =
DEFAULT_PAGE_SIZE;
const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page;
const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096;
const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
+const DEFAULT_COLUMN_INDEX_MINMAX_LEN: Option<usize> = Some(128);
Review Comment:
```suggestion
const DEFAULT_COLUMN_INDEX_MINMAX_LEN: Option<usize> = Some(64);
```
To match parquet-mr
##########
parquet/src/column/writer/mod.rs:
##########
@@ -1152,6 +1181,56 @@ fn compare_greater_byte_array_decimals(a: &[u8], b:
&[u8]) -> bool {
(a[1..]) > (b[1..])
}
+/// Truncate a UTF8 slice to the longest prefix that is still a valid UTF8
string, while being less than `max_len` bytes.
+fn truncate_utf8(data: &str, max_len: usize) -> Vec<u8> {
+ let mut max_possible_len = usize::min(data.len(), max_len);
+
+ if data.is_char_boundary(max_possible_len) {
+ return data.as_bytes()[0..max_possible_len].to_vec();
+ }
+
+ // UTF8 characters can only be up to 4 bytes long, so this loop has will
only run up to 3 times before returning.
+ loop {
+ max_possible_len -= 1;
+ if data.is_char_boundary(max_possible_len) {
+ return data.as_bytes()[0..max_possible_len].to_vec();
+ }
+ }
+}
+
+/// Truncate a binary slice to make sure its length is less than `max_len`
+fn truncate_binary(data: &[u8], max_len: usize) -> Vec<u8> {
+ data[0..usize::min(data.len(), max_len)].to_vec()
+}
+
+/// Try and increment the bytes from right to left.
+fn increment(data: &mut [u8]) {
+ for byte in data.iter_mut().rev() {
+ if *byte == u8::MAX {
+ continue;
+ } else {
+ *byte += 1;
+ break;
+ }
+ }
+}
+
+/// Try and increment the the string's bytes from right to left, returning
when the result is a valid UTF8 string.
+fn increment_utf8(data: &mut Vec<u8>) {
+ for idx in (0..data.len()).rev() {
+ let byte = &mut data[idx];
Review Comment:
FWIW I _think_ you could make this loop more efficient by finding the index
of the first zero, and verifying that adding one won't impact it
Something like
```
let byte = &mut data[idx];
let incremented = *byte + 1;
if byte.leading_ones() == incremented.leading_ones() {
*byte = incremented;
}
```
This exploits the fact the number of leading ones is what influences the
UTF-8 "framing" - https://en.wikipedia.org/wiki/UTF-8#Encoding
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]