This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 69e5382aaa Improve DataFrame functional tests (#8630)
69e5382aaa is described below
commit 69e5382aaac8dff6b163de68abc8a46f8780791a
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Dec 23 13:41:59 2023 -0500
Improve DataFrame functional tests (#8630)
---
datafusion/core/src/dataframe/mod.rs | 220 +++++++++++++----------------------
1 file changed, 82 insertions(+), 138 deletions(-)
diff --git a/datafusion/core/src/dataframe/mod.rs
b/datafusion/core/src/dataframe/mod.rs
index 4b8a9c5b7d..2ae4a7c21a 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -1356,15 +1356,30 @@ mod tests {
use arrow::array::{self, Int32Array};
use arrow::datatypes::DataType;
- use datafusion_common::{Constraint, Constraints, ScalarValue};
+ use datafusion_common::{Constraint, Constraints};
use datafusion_expr::{
avg, cast, count, count_distinct, create_udf, expr, lit, max, min, sum,
- BinaryExpr, BuiltInWindowFunction, Operator,
ScalarFunctionImplementation,
- Volatility, WindowFrame, WindowFunction,
+ BuiltInWindowFunction, ScalarFunctionImplementation, Volatility,
WindowFrame,
+ WindowFunction,
};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::get_plan_string;
+ // Get string representation of the plan
+ async fn assert_physical_plan(df: &DataFrame, expected: Vec<&str>) {
+ let physical_plan = df
+ .clone()
+ .create_physical_plan()
+ .await
+ .expect("Error creating physical plan");
+
+ let actual = get_plan_string(&physical_plan);
+ assert_eq!(
+ expected, actual,
+ "\n**Optimized Plan
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+ );
+ }
+
pub fn table_with_constraints() -> Arc<dyn TableProvider> {
let dual_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
@@ -1587,47 +1602,36 @@ mod tests {
let config = SessionConfig::new().with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);
- let table1 = table_with_constraints();
- let df = ctx.read_table(table1)?;
- let col_id = Expr::Column(datafusion_common::Column {
- relation: None,
- name: "id".to_string(),
- });
- let col_name = Expr::Column(datafusion_common::Column {
- relation: None,
- name: "name".to_string(),
- });
+ let df = ctx.read_table(table_with_constraints())?;
- // group by contains id column
- let group_expr = vec![col_id.clone()];
+ // GROUP BY id
+ let group_expr = vec![col("id")];
let aggr_expr = vec![];
let df = df.aggregate(group_expr, aggr_expr)?;
- // expr list contains id, name
- let expr_list = vec![col_id, col_name];
- let df = df.select(expr_list)?;
- let physical_plan = df.clone().create_physical_plan().await?;
- let expected = vec![
- "AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name],
aggr=[]",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- ];
- // Get string representation of the plan
- let actual = get_plan_string(&physical_plan);
- assert_eq!(
- expected, actual,
- "\n**Optimized Plan
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- // Since id and name are functionally dependant, we can use name among
expression
- // even if it is not part of the group by expression.
- let df_results = collect(physical_plan, ctx.task_ctx()).await?;
+ // Since id and name are functionally dependant, we can use name among
+ // expression even if it is not part of the group by expression and can
+ // select "name" column even though it wasn't explicitly grouped
+ let df = df.select(vec![col("id"), col("name")])?;
+ assert_physical_plan(
+ &df,
+ vec![
+ "AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name],
aggr=[]",
+ " MemoryExec: partitions=1, partition_sizes=[1]",
+ ],
+ )
+ .await;
+
+ let df_results = df.collect().await?;
#[rustfmt::skip]
- assert_batches_sorted_eq!(
- ["+----+------+",
+ assert_batches_sorted_eq!([
+ "+----+------+",
"| id | name |",
"+----+------+",
"| 1 | a |",
- "+----+------+",],
+ "+----+------+"
+ ],
&df_results
);
@@ -1640,57 +1644,31 @@ mod tests {
let config = SessionConfig::new().with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);
- let table1 = table_with_constraints();
- let df = ctx.read_table(table1)?;
- let col_id = Expr::Column(datafusion_common::Column {
- relation: None,
- name: "id".to_string(),
- });
- let col_name = Expr::Column(datafusion_common::Column {
- relation: None,
- name: "name".to_string(),
- });
+ let df = ctx.read_table(table_with_constraints())?;
- // group by contains id column
- let group_expr = vec![col_id.clone()];
+ // GROUP BY id
+ let group_expr = vec![col("id")];
let aggr_expr = vec![];
let df = df.aggregate(group_expr, aggr_expr)?;
- let condition1 = Expr::BinaryExpr(BinaryExpr::new(
- Box::new(col_id.clone()),
- Operator::Eq,
- Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
- ));
- let condition2 = Expr::BinaryExpr(BinaryExpr::new(
- Box::new(col_name),
- Operator::Eq,
- Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
- ));
- // Predicate refers to id, and name fields
- let predicate = Expr::BinaryExpr(BinaryExpr::new(
- Box::new(condition1),
- Operator::And,
- Box::new(condition2),
- ));
+ // Predicate refers to id, and name fields:
+ // id = 1 AND name = 'a'
+ let predicate = col("id").eq(lit(1i32)).and(col("name").eq(lit("a")));
let df = df.filter(predicate)?;
- let physical_plan = df.clone().create_physical_plan().await?;
-
- let expected = vec![
+ assert_physical_plan(
+ &df,
+ vec![
"CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: id@0 = 1 AND name@1 = a",
" AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name],
aggr=[]",
" MemoryExec: partitions=1, partition_sizes=[1]",
- ];
- // Get string representation of the plan
- let actual = get_plan_string(&physical_plan);
- assert_eq!(
- expected, actual,
- "\n**Optimized Plan
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
+ ],
+ )
+ .await;
// Since id and name are functionally dependant, we can use name among
expression
// even if it is not part of the group by expression.
- let df_results = collect(physical_plan, ctx.task_ctx()).await?;
+ let df_results = df.collect().await?;
#[rustfmt::skip]
assert_batches_sorted_eq!(
@@ -1711,53 +1689,35 @@ mod tests {
let config = SessionConfig::new().with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);
- let table1 = table_with_constraints();
- let df = ctx.read_table(table1)?;
- let col_id = Expr::Column(datafusion_common::Column {
- relation: None,
- name: "id".to_string(),
- });
- let col_name = Expr::Column(datafusion_common::Column {
- relation: None,
- name: "name".to_string(),
- });
+ let df = ctx.read_table(table_with_constraints())?;
- // group by contains id column
- let group_expr = vec![col_id.clone()];
+ // GROUP BY id
+ let group_expr = vec![col("id")];
let aggr_expr = vec![];
// group by id,
let df = df.aggregate(group_expr, aggr_expr)?;
- let condition1 = Expr::BinaryExpr(BinaryExpr::new(
- Box::new(col_id.clone()),
- Operator::Eq,
- Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
- ));
// Predicate refers to id field
- let predicate = condition1;
- // id=0
+ // id = 1
+ let predicate = col("id").eq(lit(1i32));
let df = df.filter(predicate)?;
// Select expression refers to id, and name columns.
// id, name
- let df = df.select(vec![col_id.clone(), col_name.clone()])?;
- let physical_plan = df.clone().create_physical_plan().await?;
-
- let expected = vec![
+ let df = df.select(vec![col("id"), col("name")])?;
+ assert_physical_plan(
+ &df,
+ vec![
"CoalesceBatchesExec: target_batch_size=8192",
" FilterExec: id@0 = 1",
" AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name],
aggr=[]",
" MemoryExec: partitions=1, partition_sizes=[1]",
- ];
- // Get string representation of the plan
- let actual = get_plan_string(&physical_plan);
- assert_eq!(
- expected, actual,
- "\n**Optimized Plan
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
+ ],
+ )
+ .await;
// Since id and name are functionally dependant, we can use name among
expression
// even if it is not part of the group by expression.
- let df_results = collect(physical_plan, ctx.task_ctx()).await?;
+ let df_results = df.collect().await?;
#[rustfmt::skip]
assert_batches_sorted_eq!(
@@ -1778,51 +1738,35 @@ mod tests {
let config = SessionConfig::new().with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);
- let table1 = table_with_constraints();
- let df = ctx.read_table(table1)?;
- let col_id = Expr::Column(datafusion_common::Column {
- relation: None,
- name: "id".to_string(),
- });
+ let df = ctx.read_table(table_with_constraints())?;
- // group by contains id column
- let group_expr = vec![col_id.clone()];
+ // GROUP BY id
+ let group_expr = vec![col("id")];
let aggr_expr = vec![];
- // group by id,
let df = df.aggregate(group_expr, aggr_expr)?;
- let condition1 = Expr::BinaryExpr(BinaryExpr::new(
- Box::new(col_id.clone()),
- Operator::Eq,
- Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
- ));
// Predicate refers to id field
- let predicate = condition1;
- // id=1
+ // id = 1
+ let predicate = col("id").eq(lit(1i32));
let df = df.filter(predicate)?;
// Select expression refers to id column.
// id
- let df = df.select(vec![col_id.clone()])?;
- let physical_plan = df.clone().create_physical_plan().await?;
+ let df = df.select(vec![col("id")])?;
// In this case aggregate shouldn't be expanded, since these
// columns are not used.
- let expected = vec![
- "CoalesceBatchesExec: target_batch_size=8192",
- " FilterExec: id@0 = 1",
- " AggregateExec: mode=Single, gby=[id@0 as id], aggr=[]",
- " MemoryExec: partitions=1, partition_sizes=[1]",
- ];
- // Get string representation of the plan
- let actual = get_plan_string(&physical_plan);
- assert_eq!(
- expected, actual,
- "\n**Optimized Plan
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
+ assert_physical_plan(
+ &df,
+ vec![
+ "CoalesceBatchesExec: target_batch_size=8192",
+ " FilterExec: id@0 = 1",
+ " AggregateExec: mode=Single, gby=[id@0 as id], aggr=[]",
+ " MemoryExec: partitions=1, partition_sizes=[1]",
+ ],
+ )
+ .await;
- // Since id and name are functionally dependant, we can use name among
expression
- // even if it is not part of the group by expression.
- let df_results = collect(physical_plan, ctx.task_ctx()).await?;
+ let df_results = df.collect().await?;
#[rustfmt::skip]
assert_batches_sorted_eq!(