wjones127 commented on code in PR #4389:
URL: https://github.com/apache/arrow-rs/pull/4389#discussion_r1224878860


##########
parquet/src/column/writer/mod.rs:
##########
@@ -1152,9 +1203,72 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: 
&[u8]) -> bool {
     (a[1..]) > (b[1..])
 }
 
+/// Truncate a UTF8 slice to the longest prefix that is still a valid UTF8 
string, while being less than `max_len` bytes.
+fn truncate_utf8(data: &str, max_len: usize) -> Vec<u8> {
+    if data.len() <= max_len {
+        return data.as_bytes().to_vec();
+    }
+
+    let max_possible_len = data.len().min(max_len);
+
+    let mut char_indices = data.char_indices();
+    dbg!(max_possible_len);

Review Comment:
   ```suggestion
   ```



##########
parquet/src/column/writer/mod.rs:
##########
@@ -1152,9 +1203,72 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: 
&[u8]) -> bool {
     (a[1..]) > (b[1..])
 }
 
+/// Truncate a UTF8 slice to the longest prefix that is still a valid UTF8 
string, while being less than `max_len` bytes.
+fn truncate_utf8(data: &str, max_len: usize) -> Vec<u8> {
+    if data.len() <= max_len {
+        return data.as_bytes().to_vec();
+    }
+
+    let max_possible_len = data.len().min(max_len);
+
+    let mut char_indices = data.char_indices();
+    dbg!(max_possible_len);
+
+    // We know `data` is a valid UTF8 encoded string, which means it has at 
least one valid UTF8 byte, which will make this loop exist
+    while let Some((idx, c)) = char_indices.next_back() {
+        let split_point = idx + c.len_utf8();
+        dbg!(split_point);

Review Comment:
   ```suggestion
   
   ```



##########
parquet/src/column/writer/mod.rs:
##########
@@ -1152,9 +1203,72 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: 
&[u8]) -> bool {
     (a[1..]) > (b[1..])
 }
 
+/// Truncate a UTF8 slice to the longest prefix that is still a valid UTF8 
string, while being less than `max_len` bytes.
+fn truncate_utf8(data: &str, max_len: usize) -> Vec<u8> {
+    if data.len() <= max_len {
+        return data.as_bytes().to_vec();
+    }
+
+    let max_possible_len = data.len().min(max_len);
+
+    let mut char_indices = data.char_indices();
+    dbg!(max_possible_len);
+
+    // We know `data` is a valid UTF8 encoded string, which means it has at 
least one valid UTF8 byte, which will make this loop exist
+    while let Some((idx, c)) = char_indices.next_back() {
+        let split_point = idx + c.len_utf8();
+        dbg!(split_point);
+        if split_point <= max_possible_len && 
data.is_char_boundary(split_point) {
+            // The character is between `idx` and `idx + _c.len_utf()`, so we 
take the longest possible slice.
+            // We take advantage of the fact that every byte prefix of a UTF8 
code point is also a valid UTF8 code point.
+
+            return data.as_bytes()[0..split_point].to_vec();
+        }
+    }
+
+    unreachable!()
+}
+
+/// Truncate a binary slice to make sure its length is less than `max_len`
+fn truncate_binary(data: &[u8], max_len: usize) -> Vec<u8> {
+    data[0..usize::min(data.len(), max_len)].to_vec()
+}
+
+/// Try and increment the bytes from right to left.
+fn increment(data: &mut [u8]) {
+    for byte in data.iter_mut().rev() {
+        if *byte == u8::MAX {
+            continue;
+        } else {
+            *byte += 1;
+            break;
+        }
+    }
+}
+
+/// Try and increment the the string's bytes from right to left, returning 
when the result is a valid UTF8 string.

Review Comment:
   ```suggestion
   /// Try and increment the string's bytes from right to left, returning when 
the result is a valid UTF8 string.
   ```



##########
parquet/src/column/writer/mod.rs:
##########
@@ -1152,6 +1181,56 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: 
&[u8]) -> bool {
     (a[1..]) > (b[1..])
 }
 
+/// Truncate a UTF8 slice to the longest prefix that is still a valid UTF8 
string, while being less than `max_len` bytes.
+fn truncate_utf8(data: &str, max_len: usize) -> Vec<u8> {
+    let mut max_possible_len = usize::min(data.len(), max_len);
+
+    if data.is_char_boundary(max_possible_len) {
+        return data.as_bytes()[0..max_possible_len].to_vec();
+    }
+
+    // UTF8 characters can only be up to 4 bytes long, so this loop has will 
only run up to 3 times before returning.
+    loop {
+        max_possible_len -= 1;
+        if data.is_char_boundary(max_possible_len) {
+            return data.as_bytes()[0..max_possible_len].to_vec();
+        }
+    }
+}
+
+/// Truncate a binary slice to make sure its length is less than `max_len`
+fn truncate_binary(data: &[u8], max_len: usize) -> Vec<u8> {
+    data[0..usize::min(data.len(), max_len)].to_vec()
+}
+
+/// Try and increment the bytes from right to left.
+fn increment(data: &mut [u8]) {
+    for byte in data.iter_mut().rev() {
+        if *byte == u8::MAX {

Review Comment:
   I think this is still outstanding, right? Is the solution to just return 
`None`?
   That seems to be what parquet-mr does: 
https://github.com/apache/parquet-mr/blob/9d80330ae4948787ac0bf4e4b0d990917f106440/parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/BinaryTruncator.java#L133-L133



##########
parquet/src/column/writer/mod.rs:
##########
@@ -2220,12 +2332,201 @@ mod tests {
         );
     }
 
+    /// Verify min/max value truncation in the column index works as expected
+    #[test]
+    fn test_column_offset_index_metadata_truncating() {
+        // write data
+        // and check the offset index and column index
+        let page_writer = get_test_page_writer();
+        let props = Default::default();
+        let mut writer =
+            get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, 
props);
+
+        let mut data = vec![FixedLenByteArray::default(); 3];
+        // This is the expected min value - "aaa..."
+        data[0].set_data(ByteBufferPtr::new(vec![97_u8; 200]));
+        // This is the expected max value - "ZZZ..."
+        data[1].set_data(ByteBufferPtr::new(vec![112_u8; 200]));
+        data[2].set_data(ByteBufferPtr::new(vec![98_u8; 200]));
+
+        writer.write_batch(&data, None, None).unwrap();
+
+        writer.flush_data_pages().unwrap();
+
+        let r = writer.close().unwrap();
+        let column_index = r.column_index.unwrap();
+        let offset_index = r.offset_index.unwrap();
+
+        assert_eq!(3, r.rows_written);
+
+        // column index
+        assert_eq!(1, column_index.null_pages.len());
+        assert_eq!(1, offset_index.page_locations.len());
+        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
+        assert!(!column_index.null_pages[0]);
+        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
+
+        if let Some(stats) = r.metadata.statistics() {
+            assert!(stats.has_min_max_set());
+            assert_eq!(stats.null_count(), 0);
+            assert_eq!(stats.distinct_count(), None);
+            if let Statistics::FixedLenByteArray(stats) = stats {
+                let column_index_min_value = 
column_index.min_values.get(0).unwrap();
+                let column_index_max_value = 
column_index.max_values.get(0).unwrap();
+
+                // Column index stats are truncated, while the column chunk's 
aren't.
+                assert_ne!(stats.min_bytes(), 
column_index_min_value.as_slice());
+                assert_ne!(stats.max_bytes(), 
column_index_max_value.as_slice());
+
+                assert_eq!(
+                    column_index_min_value.len(),
+                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
+                );
+                assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
+                assert_eq!(
+                    column_index_max_value.len(),
+                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
+                );
+
+                // We expect the last byte to be incremented
+                assert_eq!(
+                    *column_index_max_value.last().unwrap(),
+                    *column_index_max_value.first().unwrap() + 1
+                );
+            } else {
+                panic!("expecting Statistics::FixedLenByteArray");
+            }
+        } else {
+            panic!("metadata missing statistics");
+        }
+    }
+
+    #[test]
+    fn test_column_offset_index_truncating_spec_example() {
+        // write data
+        // and check the offset index and column index
+        let page_writer = get_test_page_writer();
+
+        // Truncate values at 1 byte
+        let builder =
+            
WriterProperties::builder().set_column_index_truncate_length(Some(1));
+        let props = Arc::new(builder.build());
+        let mut writer =
+            get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, 
props);
+
+        let mut data = vec![FixedLenByteArray::default(); 1];
+        // This is the expected min value
+        data[0].set_data(ByteBufferPtr::new(
+            String::from("Blart Versenwald III").into_bytes(),
+        ));
+
+        writer.write_batch(&data, None, None).unwrap();
+
+        writer.flush_data_pages().unwrap();
+
+        let r = writer.close().unwrap();
+        let column_index = r.column_index.unwrap();
+        let offset_index = r.offset_index.unwrap();
+
+        assert_eq!(1, r.rows_written);
+
+        // column index
+        assert_eq!(1, column_index.null_pages.len());
+        assert_eq!(1, offset_index.page_locations.len());
+        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
+        assert!(!column_index.null_pages[0]);
+        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
+
+        if let Some(stats) = r.metadata.statistics() {
+            assert!(stats.has_min_max_set());
+            assert_eq!(stats.null_count(), 0);
+            assert_eq!(stats.distinct_count(), None);
+            if let Statistics::FixedLenByteArray(_stats) = stats {
+                let column_index_min_value = 
column_index.min_values.get(0).unwrap();
+                let column_index_max_value = 
column_index.max_values.get(0).unwrap();
+
+                assert_ne!(column_index_min_value, column_index_max_value);
+
+                assert_eq!(column_index_min_value.len(), 1);
+                assert_eq!(column_index_max_value.len(), 1);
+
+                // Column index stats are truncated, while the column chunk's 
aren't.
+                assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
+                // In this case, they are equal because the max value is 
shorter than the default threshold

Review Comment:
   This comment doesn't seem right



##########
parquet/src/column/writer/mod.rs:
##########
@@ -2220,12 +2332,201 @@ mod tests {
         );
     }
 
+    /// Verify min/max value truncation in the column index works as expected
+    #[test]
+    fn test_column_offset_index_metadata_truncating() {
+        // write data
+        // and check the offset index and column index
+        let page_writer = get_test_page_writer();
+        let props = Default::default();
+        let mut writer =
+            get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, 
props);
+
+        let mut data = vec![FixedLenByteArray::default(); 3];
+        // This is the expected min value - "aaa..."
+        data[0].set_data(ByteBufferPtr::new(vec![97_u8; 200]));
+        // This is the expected max value - "ZZZ..."
+        data[1].set_data(ByteBufferPtr::new(vec![112_u8; 200]));
+        data[2].set_data(ByteBufferPtr::new(vec![98_u8; 200]));
+
+        writer.write_batch(&data, None, None).unwrap();
+
+        writer.flush_data_pages().unwrap();
+
+        let r = writer.close().unwrap();
+        let column_index = r.column_index.unwrap();
+        let offset_index = r.offset_index.unwrap();
+
+        assert_eq!(3, r.rows_written);
+
+        // column index
+        assert_eq!(1, column_index.null_pages.len());
+        assert_eq!(1, offset_index.page_locations.len());
+        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
+        assert!(!column_index.null_pages[0]);
+        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
+
+        if let Some(stats) = r.metadata.statistics() {
+            assert!(stats.has_min_max_set());
+            assert_eq!(stats.null_count(), 0);
+            assert_eq!(stats.distinct_count(), None);
+            if let Statistics::FixedLenByteArray(stats) = stats {
+                let column_index_min_value = 
column_index.min_values.get(0).unwrap();
+                let column_index_max_value = 
column_index.max_values.get(0).unwrap();
+
+                // Column index stats are truncated, while the column chunk's 
aren't.
+                assert_ne!(stats.min_bytes(), 
column_index_min_value.as_slice());
+                assert_ne!(stats.max_bytes(), 
column_index_max_value.as_slice());
+
+                assert_eq!(
+                    column_index_min_value.len(),
+                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
+                );
+                assert_eq!(column_index_min_value.as_slice(), &[97_u8; 64]);
+                assert_eq!(
+                    column_index_max_value.len(),
+                    DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.unwrap()
+                );
+
+                // We expect the last byte to be incremented
+                assert_eq!(
+                    *column_index_max_value.last().unwrap(),
+                    *column_index_max_value.first().unwrap() + 1
+                );
+            } else {
+                panic!("expecting Statistics::FixedLenByteArray");
+            }
+        } else {
+            panic!("metadata missing statistics");
+        }
+    }
+
+    #[test]
+    fn test_column_offset_index_truncating_spec_example() {
+        // write data
+        // and check the offset index and column index
+        let page_writer = get_test_page_writer();
+
+        // Truncate values at 1 byte
+        let builder =
+            
WriterProperties::builder().set_column_index_truncate_length(Some(1));
+        let props = Arc::new(builder.build());
+        let mut writer =
+            get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, 
props);
+
+        let mut data = vec![FixedLenByteArray::default(); 1];
+        // This is the expected min value
+        data[0].set_data(ByteBufferPtr::new(
+            String::from("Blart Versenwald III").into_bytes(),
+        ));
+
+        writer.write_batch(&data, None, None).unwrap();
+
+        writer.flush_data_pages().unwrap();
+
+        let r = writer.close().unwrap();
+        let column_index = r.column_index.unwrap();
+        let offset_index = r.offset_index.unwrap();
+
+        assert_eq!(1, r.rows_written);
+
+        // column index
+        assert_eq!(1, column_index.null_pages.len());
+        assert_eq!(1, offset_index.page_locations.len());
+        assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
+        assert!(!column_index.null_pages[0]);
+        assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);
+
+        if let Some(stats) = r.metadata.statistics() {
+            assert!(stats.has_min_max_set());
+            assert_eq!(stats.null_count(), 0);
+            assert_eq!(stats.distinct_count(), None);
+            if let Statistics::FixedLenByteArray(_stats) = stats {
+                let column_index_min_value = 
column_index.min_values.get(0).unwrap();
+                let column_index_max_value = 
column_index.max_values.get(0).unwrap();
+
+                assert_ne!(column_index_min_value, column_index_max_value);
+
+                assert_eq!(column_index_min_value.len(), 1);
+                assert_eq!(column_index_max_value.len(), 1);
+
+                // Column index stats are truncated, while the column chunk's 
aren't.
+                assert_eq!("B".as_bytes(), column_index_min_value.as_slice());
+                // In this case, they are equal because the max value is 
shorter than the default threshold
+                assert_eq!("C".as_bytes(), column_index_max_value.as_slice());
+            } else {
+                panic!("expecting Statistics::FixedLenByteArray");
+            }
+        } else {
+            panic!("metadata missing statistics");
+        }
+    }
+
     #[test]
     fn test_send() {
         fn test<T: Send>() {}
         test::<ColumnWriterImpl<Int32Type>>();
     }
 
+    #[test]
+    fn test_increment() {
+        let mut v = vec![0, 0, 0];
+        increment(&mut v);
+        assert_eq!(&v, &[0, 0, 1]);
+
+        // Handle overflow
+        let mut v = vec![0, 255, 255];
+        increment(&mut v);
+        assert_eq!(&v, &[1, 255, 255]);
+
+        // No-op if all bytes are u8::MAX
+        let mut v = vec![255, 255, 255];
+        increment(&mut v);
+        assert_eq!(&v, &[255, 255, 255]);

Review Comment:
   Is `&[255, 255, 255, 255] < &[255, 255, 255]`? Can we just truncate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to