This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 75c7578 refine test in repartition.rs & coalesce_batches.rs (#1707)
75c7578 is described below
commit 75c7578cd2d510c0814742fc78f7745ca6873c3f
Author: xudong.w <[email protected]>
AuthorDate: Sun Jan 30 21:11:17 2022 +0800
refine test in repartition.rs & coalesce_batches.rs (#1707)
---
datafusion/src/physical_plan/coalesce_batches.rs | 20 +-------------------
datafusion/src/physical_plan/mod.rs | 6 ++----
datafusion/src/physical_plan/planner.rs | 4 ++--
datafusion/src/physical_plan/repartition.rs | 22 +++-------------------
datafusion/src/test/mod.rs | 20 ++++++++++++++++++++
5 files changed, 28 insertions(+), 44 deletions(-)
diff --git a/datafusion/src/physical_plan/coalesce_batches.rs
b/datafusion/src/physical_plan/coalesce_batches.rs
index 586b052..ec238ad 100644
--- a/datafusion/src/physical_plan/coalesce_batches.rs
+++ b/datafusion/src/physical_plan/coalesce_batches.rs
@@ -295,9 +295,8 @@ pub fn concat_batches(
#[cfg(test)]
mod tests {
use super::*;
- use crate::from_slice::FromSlice;
use crate::physical_plan::{memory::MemoryExec,
repartition::RepartitionExec};
- use arrow::array::UInt32Array;
+ use crate::test::create_vec_batches;
use arrow::datatypes::{DataType, Field, Schema};
#[tokio::test(flavor = "multi_thread")]
@@ -325,23 +324,6 @@ mod tests {
Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]))
}
- fn create_vec_batches(schema: &Arc<Schema>, num_batches: usize) ->
Vec<RecordBatch> {
- let batch = create_batch(schema);
- let mut vec = Vec::with_capacity(num_batches);
- for _ in 0..num_batches {
- vec.push(batch.clone());
- }
- vec
- }
-
- fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
- RecordBatch::try_new(
- schema.clone(),
- vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
- )
- .unwrap()
- }
-
async fn coalesce_batches(
schema: &SchemaRef,
input_partitions: Vec<Vec<RecordBatch>>,
diff --git a/datafusion/src/physical_plan/mod.rs
b/datafusion/src/physical_plan/mod.rs
index 216d4a6..24aa6ad 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -59,7 +59,7 @@ pub type SendableRecordBatchStream = Pin<Box<dyn
RecordBatchStream + Send + Sync
/// EmptyRecordBatchStream can be used to create a RecordBatchStream
/// that will produce no results
pub struct EmptyRecordBatchStream {
- /// Schema
+ /// Schema wrapped by Arc
schema: SchemaRef,
}
@@ -384,9 +384,7 @@ impl Partitioning {
pub fn partition_count(&self) -> usize {
use Partitioning::*;
match self {
- RoundRobinBatch(n) => *n,
- Hash(_, n) => *n,
- UnknownPartitioning(n) => *n,
+ RoundRobinBatch(n) | Hash(_, n) | UnknownPartitioning(n) => *n,
}
}
}
diff --git a/datafusion/src/physical_plan/planner.rs
b/datafusion/src/physical_plan/planner.rs
index 2dcde9d..226e3f3 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -226,9 +226,9 @@ pub trait PhysicalPlanner {
///
/// `expr`: the expression to convert
///
- /// `input_dfschema`: the logical plan schema for evaluating `e`
+ /// `input_dfschema`: the logical plan schema for evaluating `expr`
///
- /// `input_schema`: the physical schema for evaluating `e`
+ /// `input_schema`: the physical schema for evaluating `expr`
fn create_physical_expr(
&self,
expr: &Expr,
diff --git a/datafusion/src/physical_plan/repartition.rs
b/datafusion/src/physical_plan/repartition.rs
index 7460754..8686672 100644
--- a/datafusion/src/physical_plan/repartition.rs
+++ b/datafusion/src/physical_plan/repartition.rs
@@ -447,7 +447,7 @@ struct RepartitionStream {
/// Number of input partitions that have finished sending batches to this
output channel
num_input_partitions_processed: usize,
- /// Schema
+ /// Schema wrapped by Arc
schema: SchemaRef,
/// channel containing the repartitioned batches
@@ -494,6 +494,7 @@ impl RecordBatchStream for RepartitionStream {
mod tests {
use super::*;
use crate::from_slice::FromSlice;
+ use crate::test::create_vec_batches;
use crate::{
assert_batches_sorted_eq,
physical_plan::{collect, expressions::col, memory::MemoryExec},
@@ -508,7 +509,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use arrow::{
- array::{ArrayRef, StringArray, UInt32Array},
+ array::{ArrayRef, StringArray},
error::ArrowError,
};
use futures::FutureExt;
@@ -601,23 +602,6 @@ mod tests {
Arc::new(Schema::new(vec![Field::new("c0", DataType::UInt32, false)]))
}
- fn create_vec_batches(schema: &Arc<Schema>, n: usize) -> Vec<RecordBatch> {
- let batch = create_batch(schema);
- let mut vec = Vec::with_capacity(n);
- for _ in 0..n {
- vec.push(batch.clone());
- }
- vec
- }
-
- fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
- RecordBatch::try_new(
- schema.clone(),
- vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
- )
- .unwrap()
- }
-
async fn repartition(
schema: &SchemaRef,
input_partitions: Vec<Vec<RecordBatch>>,
diff --git a/datafusion/src/test/mod.rs b/datafusion/src/test/mod.rs
index 497bfe5..cebd9ee 100644
--- a/datafusion/src/test/mod.rs
+++ b/datafusion/src/test/mod.rs
@@ -17,6 +17,7 @@
//! Common unit test utility methods
+use crate::arrow::array::UInt32Array;
use crate::datasource::object_store::local::local_unpartitioned_file;
use crate::datasource::{MemTable, PartitionedFile, TableProvider};
use crate::error::Result;
@@ -212,6 +213,25 @@ pub fn assert_is_pending<'a, T>(fut: &mut Pin<Box<dyn
Future<Output = T> + Send
assert!(poll.is_pending());
}
+/// Create vector batches
+pub fn create_vec_batches(schema: &Arc<Schema>, n: usize) -> Vec<RecordBatch> {
+ let batch = create_batch(schema);
+ let mut vec = Vec::with_capacity(n);
+ for _ in 0..n {
+ vec.push(batch.clone());
+ }
+ vec
+}
+
+/// Create batch
+fn create_batch(schema: &Arc<Schema>) -> RecordBatch {
+ RecordBatch::try_new(
+ schema.clone(),
+ vec![Arc::new(UInt32Array::from_slice(&[1, 2, 3, 4, 5, 6, 7, 8]))],
+ )
+ .unwrap()
+}
+
pub mod exec;
pub mod object_store;
pub mod user_defined;