alamb commented on code in PR #7649: URL: https://github.com/apache/arrow-rs/pull/7649#discussion_r2150765924
########## arrow-row/src/run.rs: ########## @@ -0,0 +1,695 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{variable, RowConverter, Rows, SortField}; +use arrow_array::types::RunEndIndexType; +use arrow_array::{PrimitiveArray, RunArray}; +use arrow_buffer::{ArrowNativeType, ScalarBuffer}; +use arrow_schema::{ArrowError, SortOptions}; + +/// Computes the lengths of each row for a RunEndEncodedArray +pub fn compute_lengths<R: RunEndIndexType>( + lengths: &mut [usize], + rows: &Rows, + array: &RunArray<R>, +) { + let run_ends = array.run_ends().values(); + let mut logical_start = 0; + + // Iterate over each run and apply the same length to all logical positions in the run + for (physical_idx, &run_end) in run_ends.iter().enumerate() { + let logical_end = run_end.as_usize(); + let row = rows.row(physical_idx); + let encoded_len = variable::encoded_len(Some(row.data)); + + // Add the same length for all logical positions in this run + for length in &mut lengths[logical_start..logical_end] { + *length += encoded_len; + } + + logical_start = logical_end; + } +} + +/// Encodes the provided `RunEndEncodedArray` to `out` with the provided `SortOptions` +/// +/// `rows` should contain the encoded values +pub fn encode<R: RunEndIndexType>( + data: &mut [u8], + offsets: &mut [usize], + rows: &Rows, + opts: SortOptions, + array: &RunArray<R>, +) { + let run_ends = array.run_ends(); + + let mut logical_idx = 0; + let mut offset_idx = 1; // Skip first offset + + // Iterate over each run + for physical_idx in 0..run_ends.values().len() { + let run_end = run_ends.values()[physical_idx].as_usize(); + + // Process all elements in this run + while logical_idx < run_end && offset_idx < offsets.len() { + let offset = &mut offsets[offset_idx]; + let out = &mut data[*offset..]; + + // Use variable-length encoding to make the data self-describing + let row = rows.row(physical_idx); Review Comment: Some random performance optimization thoughts (for some future PR): 1. You could hoist this out of the inner loop so it was executed once per physical value rather than once per logical value 2. You could potentially encode row once and then simply copy the encoded bytes for all remaining rows. This is probably significantly faster than re-encoding the same value over and over again. ########## arrow-row/src/run.rs: ########## @@ -0,0 +1,695 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{variable, RowConverter, Rows, SortField}; +use arrow_array::types::RunEndIndexType; +use arrow_array::{PrimitiveArray, RunArray}; +use arrow_buffer::{ArrowNativeType, ScalarBuffer}; +use arrow_schema::{ArrowError, SortOptions}; + +/// Computes the lengths of each row for a RunEndEncodedArray +pub fn compute_lengths<R: RunEndIndexType>( + lengths: &mut [usize], + rows: &Rows, + array: &RunArray<R>, +) { + let run_ends = array.run_ends().values(); + let mut logical_start = 0; + + // Iterate over each run and apply the same length to all logical positions in the run + for (physical_idx, &run_end) in run_ends.iter().enumerate() { + let logical_end = run_end.as_usize(); + let row = rows.row(physical_idx); + let encoded_len = variable::encoded_len(Some(row.data)); + + // Add the same length for all logical positions in this run + for length in &mut lengths[logical_start..logical_end] { + *length += encoded_len; + } + + logical_start = logical_end; + } +} + +/// Encodes the provided `RunEndEncodedArray` to `out` with the provided `SortOptions` +/// +/// `rows` should contain the encoded values +pub fn encode<R: RunEndIndexType>( + data: &mut [u8], + offsets: &mut [usize], + rows: &Rows, + opts: SortOptions, + array: &RunArray<R>, +) { + let run_ends = array.run_ends(); + + let mut logical_idx = 0; + let mut offset_idx = 1; // Skip first offset + + // Iterate over each run + for physical_idx in 0..run_ends.values().len() { + let run_end = run_ends.values()[physical_idx].as_usize(); + + // Process all elements in this run + while logical_idx < run_end && offset_idx < offsets.len() { + let offset = &mut offsets[offset_idx]; + let out = &mut data[*offset..]; + + // Use variable-length encoding to make the data self-describing + let row = rows.row(physical_idx); + let bytes_written = variable::encode_one(out, Some(row.data), opts); + *offset += bytes_written; + + logical_idx += 1; + offset_idx += 1; + } + + // Break if we've processed all offsets + if offset_idx >= offsets.len() { + break; + } + } +} + +/// Decodes a RunEndEncodedArray from `rows` with the provided `options` +/// +/// # Safety +/// +/// `rows` must contain valid data for the provided `converter` +pub unsafe fn decode<R: RunEndIndexType>( + converter: &RowConverter, + rows: &mut [&[u8]], + field: &SortField, + validate_utf8: bool, +) -> Result<RunArray<R>, ArrowError> { + if rows.is_empty() { + let values = converter.convert_raw(&mut [], validate_utf8)?; + let run_ends_array = PrimitiveArray::<R>::new(ScalarBuffer::from(vec![]), None); + return RunArray::<R>::try_new(&run_ends_array, &values[0]); + } + + // Decode each row's REE data and collect the decoded values + let mut decoded_values = Vec::new(); + let mut run_ends = Vec::new(); + let mut unique_row_indices = Vec::new(); + + // Process each row to extract its REE data (following decode_binary pattern) + let mut decoded_data = Vec::new(); + for (idx, row) in rows.iter_mut().enumerate() { + decoded_data.clear(); + // Extract the decoded value data from this row + let consumed = variable::decode_blocks(row, field.options, |block| { + decoded_data.extend_from_slice(block); + }); + + // Handle bit inversion for descending sort (following decode_binary pattern) + if field.options.descending { + decoded_data.iter_mut().for_each(|b| *b = !*b); + } + + // Update the row to point past the consumed REE data + *row = &row[consumed..]; + + // Check if this decoded value is the same as the previous one to identify runs + let is_new_run = + idx == 0 || decoded_data != decoded_values[*unique_row_indices.last().unwrap()]; + + if is_new_run { + // This is a new unique value - end the previous run if any + if idx > 0 { + run_ends.push(R::Native::usize_as(idx)); + } + unique_row_indices.push(decoded_values.len()); + decoded_values.push(decoded_data.clone()); + } + } + // Add the final run end + run_ends.push(R::Native::usize_as(rows.len())); + + // Convert the unique decoded values using the row converter + let mut unique_rows: Vec<&[u8]> = decoded_values.iter().map(|v| v.as_slice()).collect(); + let values = if unique_rows.is_empty() { + converter.convert_raw(&mut [], validate_utf8)? + } else { + converter.convert_raw(&mut unique_rows, validate_utf8)? + }; + + // Create run ends array + let run_ends_array = PrimitiveArray::<R>::new(ScalarBuffer::from(run_ends), None); + + // Create the RunEndEncodedArray + RunArray::<R>::try_new(&run_ends_array, &values[0]) +} + +#[cfg(test)] +mod tests { + use crate::{RowConverter, SortField}; + use arrow_array::types::Int32Type; + use arrow_array::{Array, Int64Array, RunArray, StringArray}; + use arrow_schema::{DataType, SortOptions}; + use std::sync::Arc; + + #[test] + fn test_run_end_encoded_supports_datatype() { + // Test that the RowConverter correctly supports run-end encoded arrays + assert!(RowConverter::supports_datatype(&DataType::RunEndEncoded( + Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)), + Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)), + ))); + } + + #[test] + fn test_run_end_encoded_round_trip_int64s() { + // Test round-trip correctness for RunEndEncodedArray with Int64 values making sure it + // doesn't just work with eg. strings (which are all the other tests). + + let values = Int64Array::from(vec![100, 200, 100, 300]); + let run_ends = vec![2, 3, 5, 6]; + let array: RunArray<Int32Type> = + RunArray::try_new(&arrow_array::PrimitiveArray::from(run_ends), &values).unwrap(); + + let converter = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded( + Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)), + Arc::new(arrow_schema::Field::new("values", DataType::Int64, true)), + ))]) + .unwrap(); + + let rows = converter + .convert_columns(&[Arc::new(array.clone())]) + .unwrap(); + + let arrays = converter.convert_rows(&rows).unwrap(); + let result = arrays[0] + .as_any() + .downcast_ref::<RunArray<Int32Type>>() + .unwrap(); + + assert_eq!(array.run_ends().values(), result.run_ends().values()); + assert_eq!(array.values().as_ref(), result.values().as_ref()); + } + + #[test] + fn test_run_end_encoded_round_trip_strings() { + // Test round-trip correctness for RunEndEncodedArray with strings + + let array: RunArray<Int32Type> = vec!["b", "b", "a"].into_iter().collect(); + + let converter = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded( Review Comment: These test have a lot of boiler plate. Maybe we could make a function like `assert_roundtrip(array: RunArray<..>)` that captures the common pattern Not necessary, just something I noticed while reviewing ########## arrow-row/src/run.rs: ########## @@ -0,0 +1,695 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{variable, RowConverter, Rows, SortField}; +use arrow_array::types::RunEndIndexType; +use arrow_array::{PrimitiveArray, RunArray}; +use arrow_buffer::{ArrowNativeType, ScalarBuffer}; +use arrow_schema::{ArrowError, SortOptions}; + +/// Computes the lengths of each row for a RunEndEncodedArray +pub fn compute_lengths<R: RunEndIndexType>( + lengths: &mut [usize], + rows: &Rows, + array: &RunArray<R>, +) { + let run_ends = array.run_ends().values(); + let mut logical_start = 0; + + // Iterate over each run and apply the same length to all logical positions in the run + for (physical_idx, &run_end) in run_ends.iter().enumerate() { + let logical_end = run_end.as_usize(); + let row = rows.row(physical_idx); + let encoded_len = variable::encoded_len(Some(row.data)); + + // Add the same length for all logical positions in this run + for length in &mut lengths[logical_start..logical_end] { + *length += encoded_len; + } + + logical_start = logical_end; + } +} + +/// Encodes the provided `RunEndEncodedArray` to `out` with the provided `SortOptions` +/// +/// `rows` should contain the encoded values +pub fn encode<R: RunEndIndexType>( + data: &mut [u8], + offsets: &mut [usize], + rows: &Rows, + opts: SortOptions, + array: &RunArray<R>, +) { + let run_ends = array.run_ends(); + + let mut logical_idx = 0; + let mut offset_idx = 1; // Skip first offset + + // Iterate over each run + for physical_idx in 0..run_ends.values().len() { + let run_end = run_ends.values()[physical_idx].as_usize(); + + // Process all elements in this run + while logical_idx < run_end && offset_idx < offsets.len() { + let offset = &mut offsets[offset_idx]; + let out = &mut data[*offset..]; + + // Use variable-length encoding to make the data self-describing + let row = rows.row(physical_idx); + let bytes_written = variable::encode_one(out, Some(row.data), opts); + *offset += bytes_written; + + logical_idx += 1; + offset_idx += 1; + } + + // Break if we've processed all offsets + if offset_idx >= offsets.len() { + break; + } + } +} + +/// Decodes a RunEndEncodedArray from `rows` with the provided `options` +/// +/// # Safety +/// +/// `rows` must contain valid data for the provided `converter` +pub unsafe fn decode<R: RunEndIndexType>( + converter: &RowConverter, + rows: &mut [&[u8]], + field: &SortField, + validate_utf8: bool, +) -> Result<RunArray<R>, ArrowError> { + if rows.is_empty() { + let values = converter.convert_raw(&mut [], validate_utf8)?; + let run_ends_array = PrimitiveArray::<R>::new(ScalarBuffer::from(vec![]), None); + return RunArray::<R>::try_new(&run_ends_array, &values[0]); + } + + // Decode each row's REE data and collect the decoded values + let mut decoded_values = Vec::new(); + let mut run_ends = Vec::new(); + let mut unique_row_indices = Vec::new(); + + // Process each row to extract its REE data (following decode_binary pattern) + let mut decoded_data = Vec::new(); + for (idx, row) in rows.iter_mut().enumerate() { + decoded_data.clear(); + // Extract the decoded value data from this row + let consumed = variable::decode_blocks(row, field.options, |block| { + decoded_data.extend_from_slice(block); + }); + + // Handle bit inversion for descending sort (following decode_binary pattern) + if field.options.descending { + decoded_data.iter_mut().for_each(|b| *b = !*b); + } + + // Update the row to point past the consumed REE data + *row = &row[consumed..]; + + // Check if this decoded value is the same as the previous one to identify runs + let is_new_run = + idx == 0 || decoded_data != decoded_values[*unique_row_indices.last().unwrap()]; + + if is_new_run { + // This is a new unique value - end the previous run if any + if idx > 0 { + run_ends.push(R::Native::usize_as(idx)); + } + unique_row_indices.push(decoded_values.len()); + decoded_values.push(decoded_data.clone()); + } + } + // Add the final run end + run_ends.push(R::Native::usize_as(rows.len())); + + // Convert the unique decoded values using the row converter + let mut unique_rows: Vec<&[u8]> = decoded_values.iter().map(|v| v.as_slice()).collect(); + let values = if unique_rows.is_empty() { + converter.convert_raw(&mut [], validate_utf8)? + } else { + converter.convert_raw(&mut unique_rows, validate_utf8)? + }; + + // Create run ends array + let run_ends_array = PrimitiveArray::<R>::new(ScalarBuffer::from(run_ends), None); + + // Create the RunEndEncodedArray + RunArray::<R>::try_new(&run_ends_array, &values[0]) +} + +#[cfg(test)] +mod tests { + use crate::{RowConverter, SortField}; + use arrow_array::types::Int32Type; + use arrow_array::{Array, Int64Array, RunArray, StringArray}; + use arrow_schema::{DataType, SortOptions}; + use std::sync::Arc; + + #[test] + fn test_run_end_encoded_supports_datatype() { + // Test that the RowConverter correctly supports run-end encoded arrays + assert!(RowConverter::supports_datatype(&DataType::RunEndEncoded( + Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)), + Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)), + ))); + } + + #[test] + fn test_run_end_encoded_round_trip_int64s() { + // Test round-trip correctness for RunEndEncodedArray with Int64 values making sure it + // doesn't just work with eg. strings (which are all the other tests). + + let values = Int64Array::from(vec![100, 200, 100, 300]); + let run_ends = vec![2, 3, 5, 6]; + let array: RunArray<Int32Type> = + RunArray::try_new(&arrow_array::PrimitiveArray::from(run_ends), &values).unwrap(); + + let converter = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded( + Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)), + Arc::new(arrow_schema::Field::new("values", DataType::Int64, true)), + ))]) + .unwrap(); + + let rows = converter + .convert_columns(&[Arc::new(array.clone())]) + .unwrap(); + + let arrays = converter.convert_rows(&rows).unwrap(); + let result = arrays[0] + .as_any() + .downcast_ref::<RunArray<Int32Type>>() + .unwrap(); + + assert_eq!(array.run_ends().values(), result.run_ends().values()); + assert_eq!(array.values().as_ref(), result.values().as_ref()); + } + + #[test] + fn test_run_end_encoded_round_trip_strings() { + // Test round-trip correctness for RunEndEncodedArray with strings + + let array: RunArray<Int32Type> = vec!["b", "b", "a"].into_iter().collect(); + + let converter = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded( + Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)), + Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)), + ))]) + .unwrap(); + + let rows = converter + .convert_columns(&[Arc::new(array.clone())]) + .unwrap(); + + let arrays = converter.convert_rows(&rows).unwrap(); + let result = arrays[0] + .as_any() + .downcast_ref::<RunArray<Int32Type>>() + .unwrap(); + + assert_eq!(array.run_ends().values(), result.run_ends().values()); + assert_eq!(array.values().as_ref(), result.values().as_ref()); + } + + #[test] + fn test_run_end_encoded_round_trip_strings_with_nulls() { + // Test round-trip correctness for RunEndEncodedArray with nulls + + let array: RunArray<Int32Type> = vec![Some("b"), Some("b"), None, Some("a")] + .into_iter() + .collect(); + + let converter = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded( + Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)), + Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)), + ))]) + .unwrap(); + + let rows = converter + .convert_columns(&[Arc::new(array.clone())]) + .unwrap(); + + let arrays = converter.convert_rows(&rows).unwrap(); + let result = arrays[0] + .as_any() + .downcast_ref::<RunArray<Int32Type>>() + .unwrap(); + + assert_eq!(array.run_ends().values(), result.run_ends().values()); + assert_eq!(array.values().as_ref(), result.values().as_ref()); + } + + #[test] + fn test_run_end_encoded_ascending_descending_round_trip() { + // Test round-trip correctness for ascending vs descending sort options + + let values_asc = + arrow_array::StringArray::from(vec![Some("apple"), Some("banana"), Some("cherry")]); + let run_ends_asc = vec![2, 4, 6]; + let run_array_asc: RunArray<Int32Type> = RunArray::try_new( + &arrow_array::PrimitiveArray::from(run_ends_asc), + &values_asc, + ) + .unwrap(); + + // Test ascending order + let converter_asc = RowConverter::new(vec![SortField::new_with_options( + DataType::RunEndEncoded( + Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)), + Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)), + ), + SortOptions { + descending: false, + nulls_first: true, + }, + )]) + .unwrap(); + + let rows_asc = converter_asc + .convert_columns(&[Arc::new(run_array_asc.clone())]) + .unwrap(); + let arrays_asc = converter_asc.convert_rows(&rows_asc).unwrap(); + let result_asc = arrays_asc[0] + .as_any() + .downcast_ref::<RunArray<Int32Type>>() + .unwrap(); + + // Verify round-trip correctness for ascending + assert_eq!(run_array_asc.len(), result_asc.len()); + for i in 0..run_array_asc.len() { + let orig_physical = run_array_asc.get_physical_index(i); + let result_physical = result_asc.get_physical_index(i); + + let orig_values = run_array_asc + .values() + .as_any() + .downcast_ref::<arrow_array::StringArray>() + .unwrap(); + let result_values = result_asc + .values() + .as_any() + .downcast_ref::<arrow_array::StringArray>() + .unwrap(); + + assert_eq!( + orig_values.value(orig_physical), + result_values.value(result_physical), + "Ascending sort value mismatch at index {}", + i + ); + } + + // Test descending order + let converter_desc = RowConverter::new(vec![SortField::new_with_options( + DataType::RunEndEncoded( + Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)), + Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)), + ), + SortOptions { + descending: true, + nulls_first: true, + }, + )]) + .unwrap(); + + let rows_desc = converter_desc + .convert_columns(&[Arc::new(run_array_asc.clone())]) + .unwrap(); + let arrays_desc = converter_desc.convert_rows(&rows_desc).unwrap(); + let result_desc = arrays_desc[0] + .as_any() + .downcast_ref::<RunArray<Int32Type>>() + .unwrap(); + + // Verify round-trip correctness for descending + assert_eq!(run_array_asc.len(), result_desc.len()); + for i in 0..run_array_asc.len() { + let orig_physical = run_array_asc.get_physical_index(i); + let result_physical = result_desc.get_physical_index(i); + + let orig_values = run_array_asc + .values() + .as_any() + .downcast_ref::<arrow_array::StringArray>() + .unwrap(); + let result_values = result_desc + .values() + .as_any() + .downcast_ref::<arrow_array::StringArray>() + .unwrap(); + + assert_eq!( + orig_values.value(orig_physical), + result_values.value(result_physical), + "Descending sort value mismatch at index {}", + i + ); + } + } + + #[test] + fn test_run_end_encoded_sort_configurations_basic() { + // Test that different sort configurations work and can round-trip successfully + + let test_array: RunArray<Int32Type> = vec!["test"].into_iter().collect(); + + let converter_asc = RowConverter::new(vec![SortField::new_with_options( + DataType::RunEndEncoded( + Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)), + Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)), + ), + SortOptions { + descending: false, + nulls_first: true, + }, + )]) + .unwrap(); + + let converter_desc = RowConverter::new(vec![SortField::new_with_options( + DataType::RunEndEncoded( + Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)), + Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)), + ), + SortOptions { + descending: true, + nulls_first: true, + }, + )]) + .unwrap(); + + let rows_test_asc = converter_asc + .convert_columns(&[Arc::new(test_array.clone())]) + .unwrap(); + let rows_test_desc = converter_desc + .convert_columns(&[Arc::new(test_array.clone())]) + .unwrap(); + + // Convert back to verify both configurations work + let result_test_asc = converter_asc.convert_rows(&rows_test_asc).unwrap(); Review Comment: I do think having a "assert_roundtrip" type function would make it clearer what was being tested here and would also make it easier to verify that the values were the same as well) ########## arrow-row/src/run.rs: ########## @@ -0,0 +1,695 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{variable, RowConverter, Rows, SortField}; +use arrow_array::types::RunEndIndexType; +use arrow_array::{PrimitiveArray, RunArray}; +use arrow_buffer::{ArrowNativeType, ScalarBuffer}; +use arrow_schema::{ArrowError, SortOptions}; + +/// Computes the lengths of each row for a RunEndEncodedArray +pub fn compute_lengths<R: RunEndIndexType>( + lengths: &mut [usize], + rows: &Rows, + array: &RunArray<R>, +) { + let run_ends = array.run_ends().values(); + let mut logical_start = 0; + + // Iterate over each run and apply the same length to all logical positions in the run + for (physical_idx, &run_end) in run_ends.iter().enumerate() { + let logical_end = run_end.as_usize(); + let row = rows.row(physical_idx); + let encoded_len = variable::encoded_len(Some(row.data)); + + // Add the same length for all logical positions in this run + for length in &mut lengths[logical_start..logical_end] { + *length += encoded_len; + } + + logical_start = logical_end; + } +} + +/// Encodes the provided `RunEndEncodedArray` to `out` with the provided `SortOptions` +/// +/// `rows` should contain the encoded values +pub fn encode<R: RunEndIndexType>( + data: &mut [u8], + offsets: &mut [usize], + rows: &Rows, + opts: SortOptions, + array: &RunArray<R>, +) { + let run_ends = array.run_ends(); + + let mut logical_idx = 0; + let mut offset_idx = 1; // Skip first offset + + // Iterate over each run + for physical_idx in 0..run_ends.values().len() { + let run_end = run_ends.values()[physical_idx].as_usize(); + + // Process all elements in this run + while logical_idx < run_end && offset_idx < offsets.len() { + let offset = &mut offsets[offset_idx]; + let out = &mut data[*offset..]; + + // Use variable-length encoding to make the data self-describing + let row = rows.row(physical_idx); + let bytes_written = variable::encode_one(out, Some(row.data), opts); + *offset += bytes_written; + + logical_idx += 1; + offset_idx += 1; + } + + // Break if we've processed all offsets + if offset_idx >= offsets.len() { + break; + } + } +} + +/// Decodes a RunEndEncodedArray from `rows` with the provided `options` +/// +/// # Safety +/// +/// `rows` must contain valid data for the provided `converter` +pub unsafe fn decode<R: RunEndIndexType>( + converter: &RowConverter, + rows: &mut [&[u8]], + field: &SortField, + validate_utf8: bool, +) -> Result<RunArray<R>, ArrowError> { + if rows.is_empty() { + let values = converter.convert_raw(&mut [], validate_utf8)?; + let run_ends_array = PrimitiveArray::<R>::new(ScalarBuffer::from(vec![]), None); + return RunArray::<R>::try_new(&run_ends_array, &values[0]); + } + + // Decode each row's REE data and collect the decoded values + let mut decoded_values = Vec::new(); + let mut run_ends = Vec::new(); + let mut unique_row_indices = Vec::new(); + + // Process each row to extract its REE data (following decode_binary pattern) + let mut decoded_data = Vec::new(); + for (idx, row) in rows.iter_mut().enumerate() { + decoded_data.clear(); + // Extract the decoded value data from this row + let consumed = variable::decode_blocks(row, field.options, |block| { + decoded_data.extend_from_slice(block); + }); + + // Handle bit inversion for descending sort (following decode_binary pattern) + if field.options.descending { + decoded_data.iter_mut().for_each(|b| *b = !*b); + } + + // Update the row to point past the consumed REE data + *row = &row[consumed..]; + + // Check if this decoded value is the same as the previous one to identify runs + let is_new_run = + idx == 0 || decoded_data != decoded_values[*unique_row_indices.last().unwrap()]; + + if is_new_run { + // This is a new unique value - end the previous run if any + if idx > 0 { + run_ends.push(R::Native::usize_as(idx)); + } + unique_row_indices.push(decoded_values.len()); + decoded_values.push(decoded_data.clone()); + } + } + // Add the final run end + run_ends.push(R::Native::usize_as(rows.len())); + + // Convert the unique decoded values using the row converter + let mut unique_rows: Vec<&[u8]> = decoded_values.iter().map(|v| v.as_slice()).collect(); + let values = if unique_rows.is_empty() { + converter.convert_raw(&mut [], validate_utf8)? + } else { + converter.convert_raw(&mut unique_rows, validate_utf8)? + }; + + // Create run ends array + let run_ends_array = PrimitiveArray::<R>::new(ScalarBuffer::from(run_ends), None); + + // Create the RunEndEncodedArray + RunArray::<R>::try_new(&run_ends_array, &values[0]) +} + +#[cfg(test)] +mod tests { + use crate::{RowConverter, SortField}; + use arrow_array::types::Int32Type; + use arrow_array::{Array, Int64Array, RunArray, StringArray}; + use arrow_schema::{DataType, SortOptions}; + use std::sync::Arc; + + #[test] + fn test_run_end_encoded_supports_datatype() { + // Test that the RowConverter correctly supports run-end encoded arrays + assert!(RowConverter::supports_datatype(&DataType::RunEndEncoded( + Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)), + Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)), + ))); + } + + #[test] + fn test_run_end_encoded_round_trip_int64s() { + // Test round-trip correctness for RunEndEncodedArray with Int64 values making sure it + // doesn't just work with eg. strings (which are all the other tests). + + let values = Int64Array::from(vec![100, 200, 100, 300]); + let run_ends = vec![2, 3, 5, 6]; + let array: RunArray<Int32Type> = + RunArray::try_new(&arrow_array::PrimitiveArray::from(run_ends), &values).unwrap(); + + let converter = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded( + Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)), + Arc::new(arrow_schema::Field::new("values", DataType::Int64, true)), + ))]) + .unwrap(); + + let rows = converter + .convert_columns(&[Arc::new(array.clone())]) + .unwrap(); + + let arrays = converter.convert_rows(&rows).unwrap(); + let result = arrays[0] + .as_any() + .downcast_ref::<RunArray<Int32Type>>() + .unwrap(); + + assert_eq!(array.run_ends().values(), result.run_ends().values()); Review Comment: I found it strange that this test didn't just test that array and result were equal ```rust assert_eq!(array, result); ``` So I tried it locally, and it seems that it doesn't implement `PartialEq` Maybe something we can add in a follow on PR ``` error[E0369]: binary operation `==` cannot be applied to type `RunArray<arrow_array::types::Int32Type>` --> arrow-row/src/run.rs:203:9 | 203 | assert_eq!(array, result); | ^^^^^^^^^^^^^^^^^^^^^^^^^ | | | RunArray<arrow_array::types::Int32Type> | &RunArray<arrow_array::types::Int32Type> | note: the foreign item type `RunArray<arrow_array::types::Int32Type>` doesn't implement `PartialEq<&RunArray<arrow_array::types::Int32Type>>` --> /Users/andrewlamb/Software/arrow-rs/arrow-array/src/array/run_array.rs:63:1 | 63 | pub struct RunArray<R: RunEndIndexType> { | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ not implement `PartialEq<&RunArray<arrow_array::types::Int32Type>>` = note: this error originates in the macro `assert_eq` (in Nightly builds, run with -Z macro-backtrace for more info) ``` ########## arrow-row/src/run.rs: ########## @@ -0,0 +1,695 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{variable, RowConverter, Rows, SortField}; +use arrow_array::types::RunEndIndexType; +use arrow_array::{PrimitiveArray, RunArray}; +use arrow_buffer::{ArrowNativeType, ScalarBuffer}; +use arrow_schema::{ArrowError, SortOptions}; + +/// Computes the lengths of each row for a RunEndEncodedArray +pub fn compute_lengths<R: RunEndIndexType>( + lengths: &mut [usize], + rows: &Rows, + array: &RunArray<R>, +) { + let run_ends = array.run_ends().values(); + let mut logical_start = 0; + + // Iterate over each run and apply the same length to all logical positions in the run + for (physical_idx, &run_end) in run_ends.iter().enumerate() { + let logical_end = run_end.as_usize(); + let row = rows.row(physical_idx); + let encoded_len = variable::encoded_len(Some(row.data)); + + // Add the same length for all logical positions in this run + for length in &mut lengths[logical_start..logical_end] { + *length += encoded_len; + } + + logical_start = logical_end; + } +} + +/// Encodes the provided `RunEndEncodedArray` to `out` with the provided `SortOptions` +/// +/// `rows` should contain the encoded values +pub fn encode<R: RunEndIndexType>( + data: &mut [u8], + offsets: &mut [usize], + rows: &Rows, + opts: SortOptions, + array: &RunArray<R>, +) { + let run_ends = array.run_ends(); + + let mut logical_idx = 0; + let mut offset_idx = 1; // Skip first offset + + // Iterate over each run + for physical_idx in 0..run_ends.values().len() { + let run_end = run_ends.values()[physical_idx].as_usize(); + + // Process all elements in this run + while logical_idx < run_end && offset_idx < offsets.len() { + let offset = &mut offsets[offset_idx]; + let out = &mut data[*offset..]; + + // Use variable-length encoding to make the data self-describing + let row = rows.row(physical_idx); + let bytes_written = variable::encode_one(out, Some(row.data), opts); + *offset += bytes_written; + + logical_idx += 1; + offset_idx += 1; + } + + // Break if we've processed all offsets + if offset_idx >= offsets.len() { + break; + } + } +} + +/// Decodes a RunEndEncodedArray from `rows` with the provided `options` +/// +/// # Safety +/// +/// `rows` must contain valid data for the provided `converter` +pub unsafe fn decode<R: RunEndIndexType>( + converter: &RowConverter, + rows: &mut [&[u8]], + field: &SortField, + validate_utf8: bool, +) -> Result<RunArray<R>, ArrowError> { + if rows.is_empty() { + let values = converter.convert_raw(&mut [], validate_utf8)?; + let run_ends_array = PrimitiveArray::<R>::new(ScalarBuffer::from(vec![]), None); + return RunArray::<R>::try_new(&run_ends_array, &values[0]); + } + + // Decode each row's REE data and collect the decoded values + let mut decoded_values = Vec::new(); + let mut run_ends = Vec::new(); + let mut unique_row_indices = Vec::new(); + + // Process each row to extract its REE data (following decode_binary pattern) + let mut decoded_data = Vec::new(); + for (idx, row) in rows.iter_mut().enumerate() { + decoded_data.clear(); + // Extract the decoded value data from this row + let consumed = variable::decode_blocks(row, field.options, |block| { + decoded_data.extend_from_slice(block); + }); + + // Handle bit inversion for descending sort (following decode_binary pattern) + if field.options.descending { + decoded_data.iter_mut().for_each(|b| *b = !*b); + } + + // Update the row to point past the consumed REE data + *row = &row[consumed..]; + + // Check if this decoded value is the same as the previous one to identify runs + let is_new_run = + idx == 0 || decoded_data != decoded_values[*unique_row_indices.last().unwrap()]; + + if is_new_run { + // This is a new unique value - end the previous run if any + if idx > 0 { + run_ends.push(R::Native::usize_as(idx)); + } + unique_row_indices.push(decoded_values.len()); + decoded_values.push(decoded_data.clone()); + } + } + // Add the final run end + run_ends.push(R::Native::usize_as(rows.len())); + + // Convert the unique decoded values using the row converter + let mut unique_rows: Vec<&[u8]> = decoded_values.iter().map(|v| v.as_slice()).collect(); + let values = if unique_rows.is_empty() { + converter.convert_raw(&mut [], validate_utf8)? + } else { + converter.convert_raw(&mut unique_rows, validate_utf8)? + }; + + // Create run ends array + let run_ends_array = PrimitiveArray::<R>::new(ScalarBuffer::from(run_ends), None); + + // Create the RunEndEncodedArray + RunArray::<R>::try_new(&run_ends_array, &values[0]) +} + +#[cfg(test)] +mod tests { + use crate::{RowConverter, SortField}; + use arrow_array::types::Int32Type; + use arrow_array::{Array, Int64Array, RunArray, StringArray}; + use arrow_schema::{DataType, SortOptions}; + use std::sync::Arc; + + #[test] + fn test_run_end_encoded_supports_datatype() { + // Test that the RowConverter correctly supports run-end encoded arrays + assert!(RowConverter::supports_datatype(&DataType::RunEndEncoded( + Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)), + Arc::new(arrow_schema::Field::new("values", DataType::Utf8, true)), + ))); + } + + #[test] + fn test_run_end_encoded_round_trip_int64s() { + // Test round-trip correctness for RunEndEncodedArray with Int64 values making sure it + // doesn't just work with eg. strings (which are all the other tests). + + let values = Int64Array::from(vec![100, 200, 100, 300]); + let run_ends = vec![2, 3, 5, 6]; + let array: RunArray<Int32Type> = + RunArray::try_new(&arrow_array::PrimitiveArray::from(run_ends), &values).unwrap(); + + let converter = RowConverter::new(vec![SortField::new(DataType::RunEndEncoded( + Arc::new(arrow_schema::Field::new("run_ends", DataType::Int32, false)), + Arc::new(arrow_schema::Field::new("values", DataType::Int64, true)), + ))]) + .unwrap(); + + let rows = converter + .convert_columns(&[Arc::new(array.clone())]) + .unwrap(); + + let arrays = converter.convert_rows(&rows).unwrap(); + let result = arrays[0] + .as_any() + .downcast_ref::<RunArray<Int32Type>>() + .unwrap(); Review Comment: You can achieve the same thing with a little less code like this if you want ```suggestion let result = arrays[0].as_run::<Int32Type>(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org