This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3de50c833b [minor] Use Vec instead of primitive builders (#12121)
3de50c833b is described below

commit 3de50c833b0e4bbb688b02240d11a1d5092c0d36
Author: DaniĆ«l Heres <[email protected]>
AuthorDate: Fri Aug 23 10:24:59 2024 +0200

    [minor] Use Vec instead of primitive builders (#12121)
    
    * Use vec instead of builder
    
    * Compile
    
    * Use vec instead of builder
    
    * Revert
---
 .../src/aggregate/groups_accumulator.rs            |  8 +++----
 datafusion/physical-plan/src/joins/hash_join.rs    | 25 ++++++++-------------
 .../physical-plan/src/joins/nested_loop_join.rs    | 25 +++++++++------------
 .../physical-plan/src/joins/symmetric_hash_join.rs | 10 ++++-----
 datafusion/physical-plan/src/joins/utils.rs        | 26 +++++++++-------------
 datafusion/physical-plan/src/repartition/mod.rs    | 14 ++++++------
 6 files changed, 46 insertions(+), 62 deletions(-)

diff --git 
a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs 
b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs
index 3984b02c5f..1c97d22ec7 100644
--- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs
+++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator.rs
@@ -24,7 +24,7 @@ pub mod nulls;
 pub mod prim_op;
 
 use arrow::{
-    array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray, UInt32Builder},
+    array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray},
     compute,
     datatypes::UInt32Type,
 };
@@ -170,7 +170,7 @@ impl GroupsAccumulatorAdapter {
         let mut groups_with_rows = vec![];
 
         // batch_indices holds indices into values, each group is contiguous
-        let mut batch_indices = UInt32Builder::with_capacity(0);
+        let mut batch_indices = vec![];
 
         // offsets[i] is index into batch_indices where the rows for
         // group_index i starts
@@ -184,11 +184,11 @@ impl GroupsAccumulatorAdapter {
             }
 
             groups_with_rows.push(group_index);
-            batch_indices.append_slice(indices);
+            batch_indices.extend_from_slice(indices);
             offset_so_far += indices.len();
             offsets.push(offset_so_far);
         }
-        let batch_indices = batch_indices.finish();
+        let batch_indices = batch_indices.into();
 
         // reorder the values and opt_filter by batch_indices so that
         // all values for each group are contiguous, then invoke the
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index e40a07cf62..7fac23ad55 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -49,8 +49,7 @@ use crate::{
 };
 
 use arrow::array::{
-    Array, ArrayRef, BooleanArray, BooleanBufferBuilder, PrimitiveArray, 
UInt32Array,
-    UInt64Array,
+    Array, ArrayRef, BooleanArray, BooleanBufferBuilder, UInt32Array, 
UInt64Array,
 };
 use arrow::compute::kernels::cmp::{eq, not_distinct};
 use arrow::compute::{and, concat_batches, take, FilterBuilder};
@@ -1204,13 +1203,11 @@ fn lookup_join_hashmap(
         })
         .collect::<Result<Vec<_>>>()?;
 
-    let (mut probe_builder, mut build_builder, next_offset) = build_hashmap
+    let (probe_indices, build_indices, next_offset) = build_hashmap
         .get_matched_indices_with_limit_offset(hashes_buffer, None, limit, 
offset);
 
-    let build_indices: UInt64Array =
-        PrimitiveArray::new(build_builder.finish().into(), None);
-    let probe_indices: UInt32Array =
-        PrimitiveArray::new(probe_builder.finish().into(), None);
+    let build_indices: UInt64Array = build_indices.into();
+    let probe_indices: UInt32Array = probe_indices.into();
 
     let (build_indices, probe_indices) = equal_rows_arr(
         &build_indices,
@@ -1566,7 +1563,7 @@ mod tests {
         test::build_table_i32, test::exec::MockExec,
     };
 
-    use arrow::array::{Date32Array, Int32Array, UInt32Builder, UInt64Builder};
+    use arrow::array::{Date32Array, Int32Array};
     use arrow::datatypes::{DataType, Field};
     use arrow_array::StructArray;
     use arrow_buffer::NullBuffer;
@@ -3169,17 +3166,13 @@ mod tests {
             (0, None),
         )?;
 
-        let mut left_ids = UInt64Builder::with_capacity(0);
-        left_ids.append_value(0);
-        left_ids.append_value(1);
+        let left_ids: UInt64Array = vec![0, 1].into();
 
-        let mut right_ids = UInt32Builder::with_capacity(0);
-        right_ids.append_value(0);
-        right_ids.append_value(1);
+        let right_ids: UInt32Array = vec![0, 1].into();
 
-        assert_eq!(left_ids.finish(), l);
+        assert_eq!(left_ids, l);
 
-        assert_eq!(right_ids.finish(), r);
+        assert_eq!(right_ids, r);
 
         Ok(())
     }
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs 
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 04a025c932..18de2de031 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -40,13 +40,12 @@ use crate::{
     RecordBatchStream, SendableRecordBatchStream,
 };
 
-use arrow::array::{
-    BooleanBufferBuilder, UInt32Array, UInt32Builder, UInt64Array, 
UInt64Builder,
-};
+use arrow::array::{BooleanBufferBuilder, UInt32Array, UInt64Array};
 use arrow::compute::concat_batches;
-use arrow::datatypes::{Schema, SchemaRef};
+use arrow::datatypes::{Schema, SchemaRef, UInt64Type};
 use arrow::record_batch::RecordBatch;
 use arrow::util::bit_util;
+use arrow_array::PrimitiveArray;
 use datafusion_common::{exec_datafusion_err, JoinSide, Result, Statistics};
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use datafusion_execution::TaskContext;
@@ -573,23 +572,21 @@ fn join_left_and_right_batch(
             )
         })?;
 
-    let mut left_indices_builder = UInt64Builder::new();
-    let mut right_indices_builder = UInt32Builder::new();
+    let mut left_indices_builder: Vec<u64> = vec![];
+    let mut right_indices_builder: Vec<u32> = vec![];
     for (left_side, right_side) in indices {
-        left_indices_builder
-            .append_values(left_side.values(), &vec![true; left_side.len()]);
-        right_indices_builder
-            .append_values(right_side.values(), &vec![true; right_side.len()]);
+        left_indices_builder.extend(left_side.values());
+        right_indices_builder.extend(right_side.values());
     }
 
-    let left_side = left_indices_builder.finish();
-    let right_side = right_indices_builder.finish();
+    let left_side: PrimitiveArray<UInt64Type> = left_indices_builder.into();
+    let right_side = right_indices_builder.into();
     // set the left bitmap
     // and only full join need the left bitmap
     if need_produce_result_in_final(join_type) {
         let mut bitmap = visited_left_side.lock();
-        left_side.iter().flatten().for_each(|x| {
-            bitmap.set_bit(x as usize, true);
+        left_side.values().iter().for_each(|x| {
+            bitmap.set_bit(*x as usize, true);
         });
     }
     // adjust the two side indices base on the join type
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs 
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 1bf2ef2fd5..7dab664502 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -929,13 +929,11 @@ fn lookup_join_hashmap(
     let (mut matched_probe, mut matched_build) = build_hashmap
         .get_matched_indices(hash_values.iter().enumerate().rev(), 
deleted_offset);
 
-    matched_probe.as_slice_mut().reverse();
-    matched_build.as_slice_mut().reverse();
+    matched_probe.reverse();
+    matched_build.reverse();
 
-    let build_indices: UInt64Array =
-        PrimitiveArray::new(matched_build.finish().into(), None);
-    let probe_indices: UInt32Array =
-        PrimitiveArray::new(matched_probe.finish().into(), None);
+    let build_indices: UInt64Array = matched_build.into();
+    let probe_indices: UInt32Array = matched_probe.into();
 
     let (build_indices, probe_indices) = equal_rows_arr(
         &build_indices,
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index 80d8815bde..8fdbf7041e 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -31,7 +31,7 @@ use crate::{
 
 use arrow::array::{
     downcast_array, new_null_array, Array, BooleanBufferBuilder, UInt32Array,
-    UInt32BufferBuilder, UInt32Builder, UInt64Array, UInt64BufferBuilder,
+    UInt32Builder, UInt64Array,
 };
 use arrow::compute;
 use arrow::datatypes::{Field, Schema, SchemaBuilder, UInt32Type, UInt64Type};
@@ -163,8 +163,8 @@ macro_rules! chain_traverse {
             } else {
                 i
             };
-            $match_indices.append(match_row_idx);
-            $input_indices.append($input_idx as u32);
+            $match_indices.push(match_row_idx);
+            $input_indices.push($input_idx as u32);
             $remaining_output -= 1;
             // Follow the chain to get the next index value
             let next = $next_chain[match_row_idx as usize];
@@ -238,9 +238,9 @@ pub trait JoinHashMapType {
         &self,
         iter: impl Iterator<Item = (usize, &'a u64)>,
         deleted_offset: Option<usize>,
-    ) -> (UInt32BufferBuilder, UInt64BufferBuilder) {
-        let mut input_indices = UInt32BufferBuilder::new(0);
-        let mut match_indices = UInt64BufferBuilder::new(0);
+    ) -> (Vec<u32>, Vec<u64>) {
+        let mut input_indices = vec![];
+        let mut match_indices = vec![];
 
         let hash_map = self.get_map();
         let next_chain = self.get_list();
@@ -261,8 +261,8 @@ pub trait JoinHashMapType {
                     } else {
                         i
                     };
-                    match_indices.append(match_row_idx);
-                    input_indices.append(row_idx as u32);
+                    match_indices.push(match_row_idx);
+                    input_indices.push(row_idx as u32);
                     // Follow the chain to get the next index value
                     let next = next_chain[match_row_idx as usize];
                     if next == 0 {
@@ -289,13 +289,9 @@ pub trait JoinHashMapType {
         deleted_offset: Option<usize>,
         limit: usize,
         offset: JoinHashMapOffset,
-    ) -> (
-        UInt32BufferBuilder,
-        UInt64BufferBuilder,
-        Option<JoinHashMapOffset>,
-    ) {
-        let mut input_indices = UInt32BufferBuilder::new(0);
-        let mut match_indices = UInt64BufferBuilder::new(0);
+    ) -> (Vec<u32>, Vec<u64>, Option<JoinHashMapOffset>) {
+        let mut input_indices = vec![];
+        let mut match_indices = vec![];
 
         let mut remaining_output = limit;
 
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 656d82215b..5a3fcb5029 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -38,9 +38,10 @@ use crate::sorts::streaming_merge;
 use crate::stream::RecordBatchStreamAdapter;
 use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, 
Statistics};
 
-use arrow::array::{ArrayRef, UInt64Builder};
-use arrow::datatypes::SchemaRef;
+use arrow::array::ArrayRef;
+use arrow::datatypes::{SchemaRef, UInt64Type};
 use arrow::record_batch::RecordBatch;
+use arrow_array::PrimitiveArray;
 use datafusion_common::utils::transpose;
 use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, 
Result};
 use datafusion_common_runtime::SpawnedTask;
@@ -275,12 +276,11 @@ impl BatchPartitioner {
                     create_hashes(&arrays, random_state, hash_buffer)?;
 
                     let mut indices: Vec<_> = (0..*partitions)
-                        .map(|_| 
UInt64Builder::with_capacity(batch.num_rows()))
+                        .map(|_| Vec::with_capacity(batch.num_rows()))
                         .collect();
 
                     for (index, hash) in hash_buffer.iter().enumerate() {
-                        indices[(*hash % *partitions as u64) as usize]
-                            .append_value(index as u64);
+                        indices[(*hash % *partitions as u64) as 
usize].push(index as u64);
                     }
 
                     // Finished building index-arrays for output partitions
@@ -291,8 +291,8 @@ impl BatchPartitioner {
                     let it = indices
                         .into_iter()
                         .enumerate()
-                        .filter_map(|(partition, mut indices)| {
-                            let indices = indices.finish();
+                        .filter_map(|(partition, indices)| {
+                            let indices: PrimitiveArray<UInt64Type> = 
indices.into();
                             (!indices.is_empty()).then_some((partition, 
indices))
                         })
                         .map(move |(partition, indices)| {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to