This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 689500f767 Rename function (#12654)
689500f767 is described below
commit 689500f767553d6a21768060986d614a2e86d9c8
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Sep 27 17:58:02 2024 -0700
Rename function (#12654)
---
datafusion/common/src/utils/mod.rs | 12 +++++-------
.../src/aggregate/groups_accumulator.rs | 5 ++---
datafusion/functions-aggregate/src/first_last.rs | 6 +++---
datafusion/physical-plan/src/sorts/partial_sort.rs | 4 ++--
datafusion/physical-plan/src/sorts/sort.rs | 4 ++--
.../physical-plan/src/windows/bounded_window_agg_exec.rs | 6 +++---
6 files changed, 17 insertions(+), 20 deletions(-)
diff --git a/datafusion/common/src/utils/mod.rs
b/datafusion/common/src/utils/mod.rs
index 83f98ff9af..355d699721 100644
--- a/datafusion/common/src/utils/mod.rs
+++ b/datafusion/common/src/utils/mod.rs
@@ -98,7 +98,7 @@ pub fn get_record_batch_at_indices(
record_batch: &RecordBatch,
indices: &PrimitiveArray<UInt32Type>,
) -> Result<RecordBatch> {
- let new_columns = get_arrayref_at_indices(record_batch.columns(),
indices)?;
+ let new_columns = take_arrays(record_batch.columns(), indices)?;
RecordBatch::try_new_with_options(
record_batch.schema(),
new_columns,
@@ -291,10 +291,7 @@ pub(crate) fn parse_identifiers(s: &str) ->
Result<Vec<Ident>> {
}
/// Construct a new [`Vec`] of [`ArrayRef`] from the rows of the `arrays` at
the `indices`.
-pub fn get_arrayref_at_indices(
- arrays: &[ArrayRef],
- indices: &PrimitiveArray<UInt32Type>,
-) -> Result<Vec<ArrayRef>> {
+pub fn take_arrays(arrays: &[ArrayRef], indices: &dyn Array) ->
Result<Vec<ArrayRef>> {
arrays
.iter()
.map(|array| {
@@ -1023,8 +1020,9 @@ mod tests {
vec![2, 4],
];
for row_indices in row_indices_vec {
- let indices =
PrimitiveArray::from_iter_values(row_indices.iter().cloned());
- let chunk = get_arrayref_at_indices(&arrays, &indices)?;
+ let indices: PrimitiveArray<UInt32Type> =
+ PrimitiveArray::from_iter_values(row_indices.iter().cloned());
+ let chunk = take_arrays(&arrays, &indices)?;
for (arr_orig, arr_chunk) in arrays.iter().zip(&chunk) {
for (idx, orig_idx) in row_indices.iter().enumerate() {
let res1 = ScalarValue::try_from_array(arr_orig, *orig_idx
as usize)?;
diff --git
a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs
b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs
index e60f689720..fbbf4d3035 100644
--- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs
+++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs
@@ -29,8 +29,7 @@ use arrow::{
datatypes::UInt32Type,
};
use datafusion_common::{
- arrow_datafusion_err, utils::get_arrayref_at_indices, DataFusionError,
Result,
- ScalarValue,
+ arrow_datafusion_err, utils::take_arrays, DataFusionError, Result,
ScalarValue,
};
use datafusion_expr_common::accumulator::Accumulator;
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
@@ -239,7 +238,7 @@ impl GroupsAccumulatorAdapter {
// reorder the values and opt_filter by batch_indices so that
// all values for each group are contiguous, then invoke the
// accumulator once per group with values
- let values = get_arrayref_at_indices(values, &batch_indices)?;
+ let values = take_arrays(values, &batch_indices)?;
let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?;
// invoke each accumulator with the appropriate rows, first
diff --git a/datafusion/functions-aggregate/src/first_last.rs
b/datafusion/functions-aggregate/src/first_last.rs
index 30f5d5b075..41ac787579 100644
--- a/datafusion/functions-aggregate/src/first_last.rs
+++ b/datafusion/functions-aggregate/src/first_last.rs
@@ -24,7 +24,7 @@ use std::sync::Arc;
use arrow::array::{ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field};
-use datafusion_common::utils::{compare_rows, get_arrayref_at_indices,
get_row_at_idx};
+use datafusion_common::utils::{compare_rows, get_row_at_idx, take_arrays};
use datafusion_common::{
arrow_datafusion_err, internal_err, DataFusionError, Result, ScalarValue,
};
@@ -310,7 +310,7 @@ impl Accumulator for FirstValueAccumulator {
filtered_states
} else {
let indices = lexsort_to_indices(&sort_cols, None)?;
- get_arrayref_at_indices(&filtered_states, &indices)?
+ take_arrays(&filtered_states, &indices)?
};
if !ordered_states[0].is_empty() {
let first_row = get_row_at_idx(&ordered_states, 0)?;
@@ -613,7 +613,7 @@ impl Accumulator for LastValueAccumulator {
filtered_states
} else {
let indices = lexsort_to_indices(&sort_cols, None)?;
- get_arrayref_at_indices(&filtered_states, &indices)?
+ take_arrays(&filtered_states, &indices)?
};
if !ordered_states[0].is_empty() {
diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs
b/datafusion/physical-plan/src/sorts/partial_sort.rs
index 70a63e71ad..649c05d52e 100644
--- a/datafusion/physical-plan/src/sorts/partial_sort.rs
+++ b/datafusion/physical-plan/src/sorts/partial_sort.rs
@@ -104,7 +104,7 @@ impl PartialSortExec {
input: Arc<dyn ExecutionPlan>,
common_prefix_length: usize,
) -> Self {
- assert!(common_prefix_length > 0);
+ debug_assert!(common_prefix_length > 0);
let preserve_partitioning = false;
let cache = Self::compute_properties(&input, expr.clone(),
preserve_partitioning);
Self {
@@ -289,7 +289,7 @@ impl ExecutionPlan for PartialSortExec {
// Make sure common prefix length is larger than 0
// Otherwise, we should use SortExec.
- assert!(self.common_prefix_length > 0);
+ debug_assert!(self.common_prefix_length > 0);
Ok(Box::pin(PartialSortStream {
input,
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index 64434e7a4a..91816713c6 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -46,7 +46,7 @@ use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, SortField};
use arrow_array::{Array, RecordBatchOptions, UInt32Array};
use arrow_schema::DataType;
-use datafusion_common::utils::get_arrayref_at_indices;
+use datafusion_common::utils::take_arrays;
use datafusion_common::{internal_err, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
@@ -617,7 +617,7 @@ pub fn sort_batch(
lexsort_to_indices(&sort_columns, fetch)?
};
- let columns = get_arrayref_at_indices(batch.columns(), &indices)?;
+ let columns = take_arrays(batch.columns(), &indices)?;
let options =
RecordBatchOptions::new().with_row_count(Some(indices.len()));
Ok(RecordBatch::try_new_with_options(
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index 001e134581..9510baab51 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -49,8 +49,8 @@ use arrow::{
use datafusion_common::hash_utils::create_hashes;
use datafusion_common::stats::Precision;
use datafusion_common::utils::{
- evaluate_partition_ranges, get_arrayref_at_indices, get_at_indices,
- get_record_batch_at_indices, get_row_at_idx,
+ evaluate_partition_ranges, get_at_indices, get_record_batch_at_indices,
+ get_row_at_idx, take_arrays,
};
use datafusion_common::{arrow_datafusion_err, exec_err, DataFusionError,
Result};
use datafusion_execution::TaskContext;
@@ -542,7 +542,7 @@ impl PartitionSearcher for LinearSearch {
// We should emit columns according to row index ordering.
let sorted_indices = sort_to_indices(&all_indices, None, None)?;
// Construct new column according to row ordering. This fixes ordering
- get_arrayref_at_indices(&new_columns, &sorted_indices).map(Some)
+ take_arrays(&new_columns, &sorted_indices).map(Some)
}
fn evaluate_partition_batches(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]