This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 75c7578  refine test in repartition.rs & coalesce_batches.rs (#1707)
75c7578 is described below

commit 75c7578cd2d510c0814742fc78f7745ca6873c3f
Author: xudong.w <[email protected]>
AuthorDate: Sun Jan 30 21:11:17 2022 +0800

    refine test in repartition.rs & coalesce_batches.rs (#1707)
---
 datafusion/src/physical_plan/coalesce_batches.rs | 20 +-------------------
 datafusion/src/physical_plan/mod.rs              |  6 ++----
 datafusion/src/physical_plan/planner.rs          |  4 ++--
 datafusion/src/physical_plan/repartition.rs      | 22 +++-------------------
 datafusion/src/test/mod.rs                       | 20 ++++++++++++++++++++
 5 files changed, 28 insertions(+), 44 deletions(-)

diff --git a/datafusion/src/physical_plan/coalesce_batches.rs 
b/datafusion/src/physical_plan/coalesce_batches.rs
index 586b052..ec238ad 100644
--- a/datafusion/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/src/physical_plan/coalesce_batches.rs
@@ -295,9 +295,8 @@ pub fn concat_batches(
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::from_slice::FromSlice;
     use crate::physical_plan::{memory::MemoryExec, 
repartition::RepartitionExec};
-    use arrow::array::UInt32Array;
+    use crate::test::create_vec_batches;
     use arrow::datatypes::{DataType, Field, Schema};
 
     #[tokio::test(flavor = "multi_thread")]
@@ -325,23 +324,6 @@ mod tests {
         Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]))
     }
 
-    fn create_vec_batches(schema: &Arc<Schema>, num_batches: usize) -> 
Vec<RecordBatch> {
-        let batch = create_batch(schema);
-        let mut vec = Vec::with_capacity(num_batches);
-        for _ in 0..num_batches {
-            vec.push(batch.clone());
-        }
-        vec
-    }
-
-    fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
-        RecordBatch::try_new(
-            schema.clone(),
-            vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
-        )
-        .unwrap()
-    }
-
     async fn coalesce_batches(
         schema: &SchemaRef,
         input_partitions: Vec<Vec<RecordBatch>>,
diff --git a/datafusion/src/physical_plan/mod.rs 
b/datafusion/src/physical_plan/mod.rs
index 216d4a6..24aa6ad 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -59,7 +59,7 @@ pub type SendableRecordBatchStream = Pin<Box<dyn 
RecordBatchStream + Send + Sync
 /// EmptyRecordBatchStream can be used to create a RecordBatchStream
 /// that will produce no results
 pub struct EmptyRecordBatchStream {
-    /// Schema
+    /// Schema wrapped by Arc
     schema: SchemaRef,
 }
 
@@ -384,9 +384,7 @@ impl Partitioning {
     pub fn partition_count(&self) -> usize {
         use Partitioning::*;
         match self {
-            RoundRobinBatch(n) => *n,
-            Hash(_, n) => *n,
-            UnknownPartitioning(n) => *n,
+            RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
         }
     }
 }
diff --git a/datafusion/src/physical_plan/planner.rs 
b/datafusion/src/physical_plan/planner.rs
index 2dcde9d..226e3f3 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -226,9 +226,9 @@ pub trait PhysicalPlanner {
     ///
     /// `expr`: the expression to convert
     ///
-    /// `input_dfschema`: the logical plan schema for evaluating `e`
+    /// `input_dfschema`: the logical plan schema for evaluating `expr`
     ///
-    /// `input_schema`: the physical schema for evaluating `e`
+    /// `input_schema`: the physical schema for evaluating `expr`
     fn create_physical_expr(
         &self,
         expr: &Expr,
diff --git a/datafusion/src/physical_plan/repartition.rs 
b/datafusion/src/physical_plan/repartition.rs
index 7460754..8686672 100644
--- a/datafusion/src/physical_plan/repartition.rs
+++ b/datafusion/src/physical_plan/repartition.rs
@@ -447,7 +447,7 @@ struct RepartitionStream {
     /// Number of input partitions that have finished sending batches to this 
output channel
     num_input_partitions_processed: usize,
 
-    /// Schema
+    /// Schema wrapped by Arc
     schema: SchemaRef,
 
     /// channel containing the repartitioned batches
@@ -494,6 +494,7 @@ impl RecordBatchStream for RepartitionStream {
 mod tests {
     use super::*;
     use crate::from_slice::FromSlice;
+    use crate::test::create_vec_batches;
     use crate::{
         assert_batches_sorted_eq,
         physical_plan::{collect, expressions::col, memory::MemoryExec},
@@ -508,7 +509,7 @@ mod tests {
     use arrow::datatypes::{DataType, Field, Schema};
     use arrow::record_batch::RecordBatch;
     use arrow::{
-        array::{ArrayRef, StringArray, UInt32Array},
+        array::{ArrayRef, StringArray},
         error::ArrowError,
     };
     use futures::FutureExt;
@@ -601,23 +602,6 @@ mod tests {
         Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]))
     }
 
-    fn create_vec_batches(schema: &Arc<Schema>, n: usize) -> Vec<RecordBatch> {
-        let batch = create_batch(schema);
-        let mut vec = Vec::with_capacity(n);
-        for _ in 0..n {
-            vec.push(batch.clone());
-        }
-        vec
-    }
-
-    fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
-        RecordBatch::try_new(
-            schema.clone(),
-            vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
-        )
-        .unwrap()
-    }
-
     async fn repartition(
         schema: &SchemaRef,
         input_partitions: Vec<Vec<RecordBatch>>,
diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs
index 497bfe5..cebd9ee 100644
--- a/datafusion/src/test/mod.rs
+++ b/datafusion/src/test/mod.rs
@@ -17,6 +17,7 @@
 
 //! Common unit test utility methods
 
+use crate::arrow::array::UInt32Array;
 use crate::datasource::object_store::local::local_unpartitioned_file;
 use crate::datasource::{MemTable, PartitionedFile, TableProvider};
 use crate::error::Result;
@@ -212,6 +213,25 @@ pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn 
Future<Output = T> + Send
     assert!(poll.is_pending());
 }
 
+/// Create vector batches
+pub fn create_vec_batches(schema: &Arc<Schema>, n: usize) -> Vec<RecordBatch> {
+    let batch = create_batch(schema);
+    let mut vec = Vec::with_capacity(n);
+    for _ in 0..n {
+        vec.push(batch.clone());
+    }
+    vec
+}
+
+/// Create batch
+fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
+    RecordBatch::try_new(
+        schema.clone(),
+        vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
+    )
+    .unwrap()
+}
+
 pub mod exec;
 pub mod object_store;
 pub mod user_defined;

Reply via email to