askoa commented on code in PR #3695:
URL: https://github.com/apache/arrow-rs/pull/3695#discussion_r1114306834
##########
arrow-ord/src/sort.rs:
##########
@@ -599,6 +618,208 @@ fn insert_valid_values<T>(result_slice: &mut [u32],
offset: usize, valids: &[(u3
append_valids(&mut result_slice[offset..offset + valids.len()]);
}
+// Sort run array and return sorted run array.
+// The output RunArray will be encoded at the same level as input run array.
+// For e.g. an input RunArray { run_ends = [2,4,6,8], values = [1,2,1,2] }
+// will result in output RunArray { run_ends = [2,4,6,8], values = [1,1,2,2] }
+// and not RunArray { run_ends = [4,8], values = [1,2] }
+fn sort_run(
+ values: &ArrayRef,
+ options: Option<SortOptions>,
+ limit: Option<usize>,
+) -> Result<ArrayRef, ArrowError> {
+ match values.data_type() {
+ DataType::RunEndEncoded(run_ends_field, _) => match
run_ends_field.data_type() {
+ DataType::Int16 => sort_run_downcasted::<Int16Type>(values,
options, limit),
+ DataType::Int32 => sort_run_downcasted::<Int32Type>(values,
options, limit),
+ DataType::Int64 => sort_run_downcasted::<Int64Type>(values,
options, limit),
+ dt => unreachable!("Not valid run ends data type {dt}"),
+ },
+ dt => Err(ArrowError::InvalidArgumentError(format!(
+ "Input is not a run encoded array. Input data type {dt}"
+ ))),
+ }
+}
+
+fn sort_run_downcasted<R: RunEndIndexType>(
+ values: &ArrayRef,
+ options: Option<SortOptions>,
+ limit: Option<usize>,
+) -> Result<ArrayRef, ArrowError> {
+ let run_array = values.as_any().downcast_ref::<RunArray<R>>().unwrap();
+
+ // slice the run_array.values based on offset and length.
+ let start_physical_index = run_array.get_start_physical_index();
+ let end_physical_index = run_array.get_end_physical_index();
+ let physical_len = end_physical_index - start_physical_index + 1;
+ let run_values = run_array.values().slice(start_physical_index,
physical_len);
+
+ // All the values have to be sorted irrespective of input limit.
+ let values_indices = sort_to_indices(&run_values, options, None)?;
+
+ // Determine the length of output run array.
+ let mut remaining_len = if let Some(limit) = limit {
+ limit.min(run_array.len())
+ } else {
+ run_array.len()
+ };
+
+ let run_ends = run_array.run_ends();
+
+ let mut new_run_ends_builder =
BufferBuilder::<R::Native>::new(physical_len);
+ let mut new_run_end: usize = 0;
+ let mut new_physical_len: usize = 0;
+
+ // calculate run length of sorted value indices and add them to
new_run_ends.
+ for physical_index in values_indices.into_iter() {
+ // As the values were sliced with offset = start_physical_index, it
has to be added back
+ // before accesing `RunArray::run_ends`
+ let physical_index = physical_index.unwrap() as usize +
start_physical_index;
+
+ // calculate the run length.
+ let run_length = unsafe {
+ // Safety:
+ // The index will be within bounds as its in bounds of
start_physical_index
+ // and remaining_len, both of which are within bounds of run_array
+ if physical_index == start_physical_index {
+ run_ends.value_unchecked(physical_index).as_usize() -
run_array.offset()
+ } else if physical_index == end_physical_index {
+ run_array.offset() + run_array.len()
+ - run_ends.value_unchecked(physical_index - 1).as_usize()
+ } else {
+ run_ends.value_unchecked(physical_index).as_usize()
+ - run_ends.value_unchecked(physical_index - 1).as_usize()
+ }
+ };
+ let run_length = run_length.min(remaining_len);
+ new_run_end += run_length;
+
new_run_ends_builder.append(R::Native::from_usize(new_run_end).unwrap());
+ new_physical_len += 1;
+
+ remaining_len -= run_length;
+ if remaining_len == 0 {
+ break;
+ }
+ }
+
+ if remaining_len > 0 {
+ panic!("Remaining length should be zero its values is {remaining_len}")
+ }
+
+ let new_run_ends = unsafe {
+ // Safety:
+ // The function builds a valid run_ends array and hence need not be
validated.
+ ArrayDataBuilder::new(run_array.run_ends().data_type().clone())
+ .len(new_physical_len)
+ .null_count(0)
+ .add_buffer(new_run_ends_builder.finish())
+ .build_unchecked()
+ };
+
+ // slice the sorted value indices based on limit.
+ let new_values_indices: PrimitiveArray<UInt32Type> = values_indices
+ .slice(0, new_run_ends.len())
+ .into_data()
+ .into();
+
+ let new_values = take(&run_values, &new_values_indices, None)?;
+
+ // Build sorted run array
+ let builder = ArrayDataBuilder::new(run_array.data_type().clone())
+ .len(new_run_end)
+ .add_child_data(new_run_ends)
+ .add_child_data(new_values.into_data());
+ let array_data: RunArray<R> = unsafe {
+ // Safety:
+ // This function builds a valid run array and hence can skip
validation.
+ builder.build_unchecked().into()
+ };
+ Ok(Arc::new(array_data))
+}
+
+// Sort to indices for run encoded array.
+// This function will be slow for run array as it decodes the physical indices
to
+// logical indices and to get the run array back, the logical indices has to be
+// encoded back to run array.
+fn sort_run_to_indices<R: RunEndIndexType>(
+ values: &ArrayRef,
+ options: &SortOptions,
+ limit: Option<usize>,
+) -> UInt32Array {
+ let run_array = values.as_any().downcast_ref::<RunArray<R>>().unwrap();
+
+ // slice the run_array.values based on offset and length.
+ let start_physical_index = run_array.get_start_physical_index();
+ let end_physical_index = run_array.get_end_physical_index();
+ let physical_len = end_physical_index - start_physical_index + 1;
+ let run_values = run_array.values().slice(start_physical_index,
physical_len);
+
+ // All the values have to be sorted irrespective of input limit.
+ let values_indices = sort_to_indices(&run_values, Some(*options),
None).unwrap();
+
+ let mut remaining_len = if let Some(limit) = limit {
+ limit.min(run_array.len())
+ } else {
+ run_array.len()
+ };
+
+ let mut result: Vec<u32> = Vec::with_capacity(remaining_len);
+
+ let run_ends = run_array.run_ends();
+
+ // Calculate `run length` of sorted value indices.
+ // Find the `logical index` of the value index.
+ // Add `logical index` to the output `run length` times.
+ for physical_index in values_indices.into_iter() {
+ // As the values were sliced with offset = start_physical_index, it
has to be added back
+ // before accesing `RunArray::run_ends`
+ let physical_index = physical_index.unwrap() as usize +
start_physical_index;
+
+ // calculate the run length and logical index of sorted values
+ let (run_length, logical_index_start) = unsafe {
+ // Safety:
+ // The index will be within bounds as its in bounds of
start_physical_index
+ // and len, both of which are within bounds of run_array
+ if physical_index == start_physical_index {
+ (
+ run_ends.value_unchecked(physical_index).as_usize()
+ - run_array.offset(),
+ 0,
+ )
+ } else if physical_index == end_physical_index {
+ let prev_run_end =
+ run_ends.value_unchecked(physical_index - 1).as_usize();
+ (
+ run_array.offset() + run_array.len() - prev_run_end,
+ prev_run_end - run_array.offset(),
+ )
+ } else {
+ let prev_run_end =
+ run_ends.value_unchecked(physical_index - 1).as_usize();
+ (
+ run_ends.value_unchecked(physical_index).as_usize() -
prev_run_end,
+ prev_run_end - run_array.offset(),
+ )
+ }
Review Comment:
This makes sense. Both functions look very similar for the most part. I used
a consumer to consume run_length and logical_start.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]