This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 3de50c833b [minor] Use Vec instead of primitive builders (#12121)
3de50c833b is described below
commit 3de50c833b0e4bbb688b02240d11a1d5092c0d36
Author: Daniƫl Heres <[email protected]>
AuthorDate: Fri Aug 23 10:24:59 2024 +0200
[minor] Use Vec instead of primitive builders (#12121)
* Use vec instead of builder
* Compile
* Use vec instead of builder
* Revert
---
.../src/aggregate/groups_accumulator.rs | 8 +++----
datafusion/physical-plan/src/joins/hash_join.rs | 25 ++++++++-------------
.../physical-plan/src/joins/nested_loop_join.rs | 25 +++++++++------------
.../physical-plan/src/joins/symmetric_hash_join.rs | 10 ++++-----
datafusion/physical-plan/src/joins/utils.rs | 26 +++++++++-------------
datafusion/physical-plan/src/repartition/mod.rs | 14 ++++++------
6 files changed, 46 insertions(+), 62 deletions(-)
diff --git
a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs
b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs
index 3984b02c5f..1c97d22ec7 100644
--- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs
+++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs
@@ -24,7 +24,7 @@ pub mod nulls;
pub mod prim_op;
use arrow::{
- array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray, UInt32Builder},
+ array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray},
compute,
datatypes::UInt32Type,
};
@@ -170,7 +170,7 @@ impl GroupsAccumulatorAdapter {
let mut groups_with_rows = vec![];
// batch_indices holds indices into values, each group is contiguous
- let mut batch_indices = UInt32Builder::with_capacity(0);
+ let mut batch_indices = vec![];
// offsets[i] is index into batch_indices where the rows for
// group_index i starts
@@ -184,11 +184,11 @@ impl GroupsAccumulatorAdapter {
}
groups_with_rows.push(group_index);
- batch_indices.append_slice(indices);
+ batch_indices.extend_from_slice(indices);
offset_so_far += indices.len();
offsets.push(offset_so_far);
}
- let batch_indices = batch_indices.finish();
+ let batch_indices = batch_indices.into();
// reorder the values and opt_filter by batch_indices so that
// all values for each group are contiguous, then invoke the
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs
b/datafusion/physical-plan/src/joins/hash_join.rs
index e40a07cf62..7fac23ad55 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -49,8 +49,7 @@ use crate::{
};
use arrow::array::{
- Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray,
UInt32Array,
- UInt64Array,
+ Array, ArrayRef, BooleanArray, BooleanBufferBuilder, UInt32Array,
UInt64Array,
};
use arrow::compute::kernels::cmp::{eq, not_distinct};
use arrow::compute::{and, concat_batches, take, FilterBuilder};
@@ -1204,13 +1203,11 @@ fn lookup_join_hashmap(
})
.collect::<Result<Vec<_>>>()?;
- let (mut probe_builder, mut build_builder, next_offset) = build_hashmap
+ let (probe_indices, build_indices, next_offset) = build_hashmap
.get_matched_indices_with_limit_offset(hashes_buffer, None, limit,
offset);
- let build_indices: UInt64Array =
- PrimitiveArray::new(build_builder.finish().into(), None);
- let probe_indices: UInt32Array =
- PrimitiveArray::new(probe_builder.finish().into(), None);
+ let build_indices: UInt64Array = build_indices.into();
+ let probe_indices: UInt32Array = probe_indices.into();
let (build_indices, probe_indices) = equal_rows_arr(
&build_indices,
@@ -1566,7 +1563,7 @@ mod tests {
test::build_table_i32, test::exec::MockExec,
};
- use arrow::array::{Date32Array, Int32Array, UInt32Builder, UInt64Builder};
+ use arrow::array::{Date32Array, Int32Array};
use arrow::datatypes::{DataType, Field};
use arrow_array::StructArray;
use arrow_buffer::NullBuffer;
@@ -3169,17 +3166,13 @@ mod tests {
(0, None),
)?;
- let mut left_ids = UInt64Builder::with_capacity(0);
- left_ids.append_value(0);
- left_ids.append_value(1);
+ let left_ids: UInt64Array = vec![0, 1].into();
- let mut right_ids = UInt32Builder::with_capacity(0);
- right_ids.append_value(0);
- right_ids.append_value(1);
+ let right_ids: UInt32Array = vec![0, 1].into();
- assert_eq!(left_ids.finish(), l);
+ assert_eq!(left_ids, l);
- assert_eq!(right_ids.finish(), r);
+ assert_eq!(right_ids, r);
Ok(())
}
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 04a025c932..18de2de031 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -40,13 +40,12 @@ use crate::{
RecordBatchStream, SendableRecordBatchStream,
};
-use arrow::array::{
- BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array,
UInt64Builder,
-};
+use arrow::array::{BooleanBufferBuilder, UInt32Array, UInt64Array};
use arrow::compute::concat_batches;
-use arrow::datatypes::{Schema, SchemaRef};
+use arrow::datatypes::{Schema, SchemaRef, UInt64Type};
use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
+use arrow_array::PrimitiveArray;
use datafusion_common::{exec_datafusion_err, JoinSide, Result, Statistics};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::TaskContext;
@@ -573,23 +572,21 @@ fn join_left_and_right_batch(
)
})?;
- let mut left_indices_builder = UInt64Builder::new();
- let mut right_indices_builder = UInt32Builder::new();
+ let mut left_indices_builder: Vec<u64> = vec![];
+ let mut right_indices_builder: Vec<u32> = vec![];
for (left_side, right_side) in indices {
- left_indices_builder
- .append_values(left_side.values(), &vec![true; left_side.len()]);
- right_indices_builder
- .append_values(right_side.values(), &vec![true; right_side.len()]);
+ left_indices_builder.extend(left_side.values());
+ right_indices_builder.extend(right_side.values());
}
- let left_side = left_indices_builder.finish();
- let right_side = right_indices_builder.finish();
+ let left_side: PrimitiveArray<UInt64Type> = left_indices_builder.into();
+ let right_side = right_indices_builder.into();
// set the left bitmap
// and only full join need the left bitmap
if need_produce_result_in_final(join_type) {
let mut bitmap = visited_left_side.lock();
- left_side.iter().flatten().for_each(|x| {
- bitmap.set_bit(x as usize, true);
+ left_side.values().iter().for_each(|x| {
+ bitmap.set_bit(*x as usize, true);
});
}
// adjust the two side indices base on the join type
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 1bf2ef2fd5..7dab664502 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -929,13 +929,11 @@ fn lookup_join_hashmap(
let (mut matched_probe, mut matched_build) = build_hashmap
.get_matched_indices(hash_values.iter().enumerate().rev(),
deleted_offset);
- matched_probe.as_slice_mut().reverse();
- matched_build.as_slice_mut().reverse();
+ matched_probe.reverse();
+ matched_build.reverse();
- let build_indices: UInt64Array =
- PrimitiveArray::new(matched_build.finish().into(), None);
- let probe_indices: UInt32Array =
- PrimitiveArray::new(matched_probe.finish().into(), None);
+ let build_indices: UInt64Array = matched_build.into();
+ let probe_indices: UInt32Array = matched_probe.into();
let (build_indices, probe_indices) = equal_rows_arr(
&build_indices,
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index 80d8815bde..8fdbf7041e 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -31,7 +31,7 @@ use crate::{
use arrow::array::{
downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array,
- UInt32BufferBuilder, UInt32Builder, UInt64Array, UInt64BufferBuilder,
+ UInt32Builder, UInt64Array,
};
use arrow::compute;
use arrow::datatypes::{Field, Schema, SchemaBuilder, UInt32Type, UInt64Type};
@@ -163,8 +163,8 @@ macro_rules! chain_traverse {
} else {
i
};
- $match_indices.append(match_row_idx);
- $input_indices.append($input_idx as u32);
+ $match_indices.push(match_row_idx);
+ $input_indices.push($input_idx as u32);
$remaining_output -= 1;
// Follow the chain to get the next index value
let next = $next_chain[match_row_idx as usize];
@@ -238,9 +238,9 @@ pub trait JoinHashMapType {
&self,
iter: impl Iterator<Item = (usize, &'a u64)>,
deleted_offset: Option<usize>,
- ) -> (UInt32BufferBuilder, UInt64BufferBuilder) {
- let mut input_indices = UInt32BufferBuilder::new(0);
- let mut match_indices = UInt64BufferBuilder::new(0);
+ ) -> (Vec<u32>, Vec<u64>) {
+ let mut input_indices = vec![];
+ let mut match_indices = vec![];
let hash_map = self.get_map();
let next_chain = self.get_list();
@@ -261,8 +261,8 @@ pub trait JoinHashMapType {
} else {
i
};
- match_indices.append(match_row_idx);
- input_indices.append(row_idx as u32);
+ match_indices.push(match_row_idx);
+ input_indices.push(row_idx as u32);
// Follow the chain to get the next index value
let next = next_chain[match_row_idx as usize];
if next == 0 {
@@ -289,13 +289,9 @@ pub trait JoinHashMapType {
deleted_offset: Option<usize>,
limit: usize,
offset: JoinHashMapOffset,
- ) -> (
- UInt32BufferBuilder,
- UInt64BufferBuilder,
- Option<JoinHashMapOffset>,
- ) {
- let mut input_indices = UInt32BufferBuilder::new(0);
- let mut match_indices = UInt64BufferBuilder::new(0);
+ ) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>) {
+ let mut input_indices = vec![];
+ let mut match_indices = vec![];
let mut remaining_output = limit;
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 656d82215b..5a3fcb5029 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -38,9 +38,10 @@ use crate::sorts::streaming_merge;
use crate::stream::RecordBatchStreamAdapter;
use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
Statistics};
-use arrow::array::{ArrayRef, UInt64Builder};
-use arrow::datatypes::SchemaRef;
+use arrow::array::ArrayRef;
+use arrow::datatypes::{SchemaRef, UInt64Type};
use arrow::record_batch::RecordBatch;
+use arrow_array::PrimitiveArray;
use datafusion_common::utils::transpose;
use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError,
Result};
use datafusion_common_runtime::SpawnedTask;
@@ -275,12 +276,11 @@ impl BatchPartitioner {
create_hashes(&arrays, random_state, hash_buffer)?;
let mut indices: Vec<_> = (0..*partitions)
- .map(|_|
UInt64Builder::with_capacity(batch.num_rows()))
+ .map(|_| Vec::with_capacity(batch.num_rows()))
.collect();
for (index, hash) in hash_buffer.iter().enumerate() {
- indices[(*hash % *partitions as u64) as usize]
- .append_value(index as u64);
+ indices[(*hash % *partitions as u64) as
usize].push(index as u64);
}
// Finished building index-arrays for output partitions
@@ -291,8 +291,8 @@ impl BatchPartitioner {
let it = indices
.into_iter()
.enumerate()
- .filter_map(|(partition, mut indices)| {
- let indices = indices.finish();
+ .filter_map(|(partition, indices)| {
+ let indices: PrimitiveArray<UInt64Type> =
indices.into();
(!indices.is_empty()).then_some((partition,
indices))
})
.map(move |(partition, indices)| {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]