This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6553fafc5c [Minor] Remove redundant member from RepartitionExec 
(#12638)
6553fafc5c is described below

commit 6553fafc5c320e13fb177977b5a78a91c2ac2fc1
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Sep 27 13:55:56 2024 -0700

    [Minor] Remove redundant member from RepartitionExec (#12638)
    
    * Initial commit
    
    * Use util get_array_ref
    
    * Resolve linter errors
---
 datafusion/physical-plan/src/repartition/mod.rs | 46 ++++++++++---------------
 1 file changed, 19 insertions(+), 27 deletions(-)

diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 093803e3c8..10f898b26a 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -38,12 +38,11 @@ use crate::sorts::streaming_merge;
 use crate::stream::RecordBatchStreamAdapter;
 use crate::{DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, 
Statistics};
 
-use arrow::array::ArrayRef;
-use arrow::datatypes::{SchemaRef, UInt64Type};
+use arrow::datatypes::{SchemaRef, UInt32Type};
 use arrow::record_batch::RecordBatch;
 use arrow_array::{PrimitiveArray, RecordBatchOptions};
-use datafusion_common::utils::transpose;
-use datafusion_common::{arrow_datafusion_err, not_impl_err, DataFusionError, 
Result};
+use datafusion_common::utils::{get_arrayref_at_indices, transpose};
+use datafusion_common::{not_impl_err, DataFusionError, Result};
 use datafusion_common_runtime::SpawnedTask;
 use datafusion_execution::memory_pool::MemoryConsumer;
 use datafusion_execution::TaskContext;
@@ -280,7 +279,7 @@ impl BatchPartitioner {
                         .collect();
 
                     for (index, hash) in hash_buffer.iter().enumerate() {
-                        indices[(*hash % *partitions as u64) as 
usize].push(index as u64);
+                        indices[(*hash % *partitions as u64) as 
usize].push(index as u32);
                     }
 
                     // Finished building index-arrays for output partitions
@@ -292,7 +291,7 @@ impl BatchPartitioner {
                         .into_iter()
                         .enumerate()
                         .filter_map(|(partition, indices)| {
-                            let indices: PrimitiveArray<UInt64Type> = 
indices.into();
+                            let indices: PrimitiveArray<UInt32Type> = 
indices.into();
                             (!indices.is_empty()).then_some((partition, 
indices))
                         })
                         .map(move |(partition, indices)| {
@@ -300,14 +299,8 @@ impl BatchPartitioner {
                             let _timer = partitioner_timer.timer();
 
                             // Produce batches based on indices
-                            let columns = batch
-                                .columns()
-                                .iter()
-                                .map(|c| {
-                                    arrow::compute::take(c.as_ref(), &indices, 
None)
-                                        .map_err(|e| arrow_datafusion_err!(e))
-                                })
-                                .collect::<Result<Vec<ArrayRef>>>()?;
+                            let columns =
+                                get_arrayref_at_indices(batch.columns(), 
&indices)?;
 
                             let mut options = RecordBatchOptions::new();
                             options = 
options.with_row_count(Some(indices.len()));
@@ -403,8 +396,6 @@ impl BatchPartitioner {
 pub struct RepartitionExec {
     /// Input execution plan
     input: Arc<dyn ExecutionPlan>,
-    /// Partitioning scheme to use
-    partitioning: Partitioning,
     /// Inner state that is initialized when the first output stream is 
created.
     state: LazyState,
     /// Execution metrics
@@ -469,7 +460,7 @@ impl RepartitionExec {
 
     /// Partitioning scheme to use
     pub fn partitioning(&self) -> &Partitioning {
-        &self.partitioning
+        &self.cache.partitioning
     }
 
     /// Get preserve_order flag of the RepartitionExecutor
@@ -496,7 +487,7 @@ impl DisplayAs for RepartitionExec {
                     f,
                     "{}: partitioning={}, input_partitions={}",
                     self.name(),
-                    self.partitioning,
+                    self.partitioning(),
                     self.input.output_partitioning().partition_count()
                 )?;
 
@@ -539,8 +530,10 @@ impl ExecutionPlan for RepartitionExec {
         self: Arc<Self>,
         mut children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let mut repartition =
-            RepartitionExec::try_new(children.swap_remove(0), 
self.partitioning.clone())?;
+        let mut repartition = RepartitionExec::try_new(
+            children.swap_remove(0),
+            self.partitioning().clone(),
+        )?;
         if self.preserve_order {
             repartition = repartition.with_preserve_order();
         }
@@ -548,7 +541,7 @@ impl ExecutionPlan for RepartitionExec {
     }
 
     fn benefits_from_input_partitioning(&self) -> Vec<bool> {
-        vec![matches!(self.partitioning, Partitioning::Hash(_, _))]
+        vec![matches!(self.partitioning(), Partitioning::Hash(_, _))]
     }
 
     fn maintains_input_order(&self) -> Vec<bool> {
@@ -568,7 +561,7 @@ impl ExecutionPlan for RepartitionExec {
 
         let lazy_state = Arc::clone(&self.state);
         let input = Arc::clone(&self.input);
-        let partitioning = self.partitioning.clone();
+        let partitioning = self.partitioning().clone();
         let metrics = self.metrics.clone();
         let preserve_order = self.preserve_order;
         let name = self.name().to_owned();
@@ -687,7 +680,6 @@ impl RepartitionExec {
             Self::compute_properties(&input, partitioning.clone(), 
preserve_order);
         Ok(RepartitionExec {
             input,
-            partitioning,
             state: Default::default(),
             metrics: ExecutionPlanMetricsSet::new(),
             preserve_order,
@@ -1027,10 +1019,10 @@ mod tests {
         {collect, expressions::col, memory::MemoryExec},
     };
 
-    use arrow::array::{StringArray, UInt32Array};
+    use arrow::array::{ArrayRef, StringArray, UInt32Array};
     use arrow::datatypes::{DataType, Field, Schema};
     use datafusion_common::cast::as_string_array;
-    use datafusion_common::{assert_batches_sorted_eq, exec_err};
+    use datafusion_common::{arrow_datafusion_err, assert_batches_sorted_eq, 
exec_err};
     use datafusion_execution::runtime_env::RuntimeEnvBuilder;
 
     use tokio::task::JoinSet;
@@ -1134,7 +1126,7 @@ mod tests {
 
         // execute and collect results
         let mut output_partitions = vec![];
-        for i in 0..exec.partitioning.partition_count() {
+        for i in 0..exec.partitioning().partition_count() {
             // execute this *output* partition and collect all batches
             let mut stream = exec.execute(i, Arc::clone(&task_ctx))?;
             let mut batches = vec![];
@@ -1524,7 +1516,7 @@ mod tests {
         let exec = RepartitionExec::try_new(Arc::new(exec), partitioning)?;
 
         // pull partitions
-        for i in 0..exec.partitioning.partition_count() {
+        for i in 0..exec.partitioning().partition_count() {
             let mut stream = exec.execute(i, Arc::clone(&task_ctx))?;
             let err =
                 
arrow_datafusion_err!(stream.next().await.unwrap().unwrap_err().into());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to