This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 69e5382aaa Improve DataFrame functional tests (#8630)
69e5382aaa is described below

commit 69e5382aaac8dff6b163de68abc8a46f8780791a
Author: Andrew Lamb <[email protected]>
AuthorDate: Sat Dec 23 13:41:59 2023 -0500

    Improve DataFrame functional tests (#8630)
---
 datafusion/core/src/dataframe/mod.rs | 220 +++++++++++++----------------------
 1 file changed, 82 insertions(+), 138 deletions(-)

diff --git a/datafusion/core/src/dataframe/mod.rs 
b/datafusion/core/src/dataframe/mod.rs
index 4b8a9c5b7d..2ae4a7c21a 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -1356,15 +1356,30 @@ mod tests {
 
     use arrow::array::{self, Int32Array};
     use arrow::datatypes::DataType;
-    use datafusion_common::{Constraint, Constraints, ScalarValue};
+    use datafusion_common::{Constraint, Constraints};
     use datafusion_expr::{
         avg, cast, count, count_distinct, create_udf, expr, lit, max, min, sum,
-        BinaryExpr, BuiltInWindowFunction, Operator, 
ScalarFunctionImplementation,
-        Volatility, WindowFrame, WindowFunction,
+        BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, 
WindowFrame,
+        WindowFunction,
     };
     use datafusion_physical_expr::expressions::Column;
     use datafusion_physical_plan::get_plan_string;
 
+    // Get string representation of the plan
+    async fn assert_physical_plan(df: &DataFrame, expected: Vec<&str>) {
+        let physical_plan = df
+            .clone()
+            .create_physical_plan()
+            .await
+            .expect("Error creating physical plan");
+
+        let actual = get_plan_string(&physical_plan);
+        assert_eq!(
+            expected, actual,
+            "\n**Optimized Plan 
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+        );
+    }
+
     pub fn table_with_constraints() -> Arc<dyn TableProvider> {
         let dual_schema = Arc::new(Schema::new(vec![
             Field::new("id", DataType::Int32, false),
@@ -1587,47 +1602,36 @@ mod tests {
         let config = SessionConfig::new().with_target_partitions(1);
         let ctx = SessionContext::new_with_config(config);
 
-        let table1 = table_with_constraints();
-        let df = ctx.read_table(table1)?;
-        let col_id = Expr::Column(datafusion_common::Column {
-            relation: None,
-            name: "id".to_string(),
-        });
-        let col_name = Expr::Column(datafusion_common::Column {
-            relation: None,
-            name: "name".to_string(),
-        });
+        let df = ctx.read_table(table_with_constraints())?;
 
-        // group by contains id column
-        let group_expr = vec![col_id.clone()];
+        // GROUP BY id
+        let group_expr = vec![col("id")];
         let aggr_expr = vec![];
         let df = df.aggregate(group_expr, aggr_expr)?;
 
-        // expr list contains id, name
-        let expr_list = vec![col_id, col_name];
-        let df = df.select(expr_list)?;
-        let physical_plan = df.clone().create_physical_plan().await?;
-        let expected = vec![
-            "AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], 
aggr=[]",
-            "  MemoryExec: partitions=1, partition_sizes=[1]",
-        ];
-        // Get string representation of the plan
-        let actual = get_plan_string(&physical_plan);
-        assert_eq!(
-            expected, actual,
-            "\n**Optimized Plan 
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-        // Since id and name are functionally dependant, we can use name among 
expression
-        // even if it is not part of the group by expression.
-        let df_results = collect(physical_plan, ctx.task_ctx()).await?;
+        // Since id and name are functionally dependant, we can use name among
+        // expression even if it is not part of the group by expression and can
+        // select "name" column even though it wasn't explicitly grouped
+        let df = df.select(vec![col("id"), col("name")])?;
+        assert_physical_plan(
+            &df,
+            vec![
+                "AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], 
aggr=[]",
+                "  MemoryExec: partitions=1, partition_sizes=[1]",
+            ],
+        )
+        .await;
+
+        let df_results = df.collect().await?;
 
         #[rustfmt::skip]
-        assert_batches_sorted_eq!(
-            ["+----+------+",
+        assert_batches_sorted_eq!([
+             "+----+------+",
              "| id | name |",
              "+----+------+",
              "| 1  | a    |",
-             "+----+------+",],
+             "+----+------+"
+            ],
             &df_results
         );
 
@@ -1640,57 +1644,31 @@ mod tests {
         let config = SessionConfig::new().with_target_partitions(1);
         let ctx = SessionContext::new_with_config(config);
 
-        let table1 = table_with_constraints();
-        let df = ctx.read_table(table1)?;
-        let col_id = Expr::Column(datafusion_common::Column {
-            relation: None,
-            name: "id".to_string(),
-        });
-        let col_name = Expr::Column(datafusion_common::Column {
-            relation: None,
-            name: "name".to_string(),
-        });
+        let df = ctx.read_table(table_with_constraints())?;
 
-        // group by contains id column
-        let group_expr = vec![col_id.clone()];
+        // GROUP BY id
+        let group_expr = vec![col("id")];
         let aggr_expr = vec![];
         let df = df.aggregate(group_expr, aggr_expr)?;
 
-        let condition1 = Expr::BinaryExpr(BinaryExpr::new(
-            Box::new(col_id.clone()),
-            Operator::Eq,
-            Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
-        ));
-        let condition2 = Expr::BinaryExpr(BinaryExpr::new(
-            Box::new(col_name),
-            Operator::Eq,
-            Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
-        ));
-        // Predicate refers to id, and name fields
-        let predicate = Expr::BinaryExpr(BinaryExpr::new(
-            Box::new(condition1),
-            Operator::And,
-            Box::new(condition2),
-        ));
+        // Predicate refers to id, and name fields:
+        // id = 1 AND name = 'a'
+        let predicate = col("id").eq(lit(1i32)).and(col("name").eq(lit("a")));
         let df = df.filter(predicate)?;
-        let physical_plan = df.clone().create_physical_plan().await?;
-
-        let expected = vec![
+        assert_physical_plan(
+            &df,
+            vec![
             "CoalesceBatchesExec: target_batch_size=8192",
             "  FilterExec: id@0 = 1 AND name@1 = a",
             "    AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], 
aggr=[]",
             "      MemoryExec: partitions=1, partition_sizes=[1]",
-        ];
-        // Get string representation of the plan
-        let actual = get_plan_string(&physical_plan);
-        assert_eq!(
-            expected, actual,
-            "\n**Optimized Plan 
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
+        ],
+        )
+        .await;
 
         // Since id and name are functionally dependant, we can use name among 
expression
         // even if it is not part of the group by expression.
-        let df_results = collect(physical_plan, ctx.task_ctx()).await?;
+        let df_results = df.collect().await?;
 
         #[rustfmt::skip]
         assert_batches_sorted_eq!(
@@ -1711,53 +1689,35 @@ mod tests {
         let config = SessionConfig::new().with_target_partitions(1);
         let ctx = SessionContext::new_with_config(config);
 
-        let table1 = table_with_constraints();
-        let df = ctx.read_table(table1)?;
-        let col_id = Expr::Column(datafusion_common::Column {
-            relation: None,
-            name: "id".to_string(),
-        });
-        let col_name = Expr::Column(datafusion_common::Column {
-            relation: None,
-            name: "name".to_string(),
-        });
+        let df = ctx.read_table(table_with_constraints())?;
 
-        // group by contains id column
-        let group_expr = vec![col_id.clone()];
+        // GROUP BY id
+        let group_expr = vec![col("id")];
         let aggr_expr = vec![];
         // group by id,
         let df = df.aggregate(group_expr, aggr_expr)?;
 
-        let condition1 = Expr::BinaryExpr(BinaryExpr::new(
-            Box::new(col_id.clone()),
-            Operator::Eq,
-            Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
-        ));
         // Predicate refers to id field
-        let predicate = condition1;
-        // id=0
+        // id = 1
+        let predicate = col("id").eq(lit(1i32));
         let df = df.filter(predicate)?;
         // Select expression refers to id, and name columns.
         // id, name
-        let df = df.select(vec![col_id.clone(), col_name.clone()])?;
-        let physical_plan = df.clone().create_physical_plan().await?;
-
-        let expected = vec![
+        let df = df.select(vec![col("id"), col("name")])?;
+        assert_physical_plan(
+            &df,
+            vec![
             "CoalesceBatchesExec: target_batch_size=8192",
             "  FilterExec: id@0 = 1",
             "    AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], 
aggr=[]",
             "      MemoryExec: partitions=1, partition_sizes=[1]",
-        ];
-        // Get string representation of the plan
-        let actual = get_plan_string(&physical_plan);
-        assert_eq!(
-            expected, actual,
-            "\n**Optimized Plan 
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
+        ],
+        )
+        .await;
 
         // Since id and name are functionally dependant, we can use name among 
expression
         // even if it is not part of the group by expression.
-        let df_results = collect(physical_plan, ctx.task_ctx()).await?;
+        let df_results = df.collect().await?;
 
         #[rustfmt::skip]
         assert_batches_sorted_eq!(
@@ -1778,51 +1738,35 @@ mod tests {
         let config = SessionConfig::new().with_target_partitions(1);
         let ctx = SessionContext::new_with_config(config);
 
-        let table1 = table_with_constraints();
-        let df = ctx.read_table(table1)?;
-        let col_id = Expr::Column(datafusion_common::Column {
-            relation: None,
-            name: "id".to_string(),
-        });
+        let df = ctx.read_table(table_with_constraints())?;
 
-        // group by contains id column
-        let group_expr = vec![col_id.clone()];
+        // GROUP BY id
+        let group_expr = vec![col("id")];
         let aggr_expr = vec![];
-        // group by id,
         let df = df.aggregate(group_expr, aggr_expr)?;
 
-        let condition1 = Expr::BinaryExpr(BinaryExpr::new(
-            Box::new(col_id.clone()),
-            Operator::Eq,
-            Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
-        ));
         // Predicate refers to id field
-        let predicate = condition1;
-        // id=1
+        // id = 1
+        let predicate = col("id").eq(lit(1i32));
         let df = df.filter(predicate)?;
         // Select expression refers to id column.
         // id
-        let df = df.select(vec![col_id.clone()])?;
-        let physical_plan = df.clone().create_physical_plan().await?;
+        let df = df.select(vec![col("id")])?;
 
         // In this case aggregate shouldn't be expanded, since these
         // columns are not used.
-        let expected = vec![
-            "CoalesceBatchesExec: target_batch_size=8192",
-            "  FilterExec: id@0 = 1",
-            "    AggregateExec: mode=Single, gby=[id@0 as id], aggr=[]",
-            "      MemoryExec: partitions=1, partition_sizes=[1]",
-        ];
-        // Get string representation of the plan
-        let actual = get_plan_string(&physical_plan);
-        assert_eq!(
-            expected, actual,
-            "\n**Optimized Plan 
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
+        assert_physical_plan(
+            &df,
+            vec![
+                "CoalesceBatchesExec: target_batch_size=8192",
+                "  FilterExec: id@0 = 1",
+                "    AggregateExec: mode=Single, gby=[id@0 as id], aggr=[]",
+                "      MemoryExec: partitions=1, partition_sizes=[1]",
+            ],
+        )
+        .await;
 
-        // Since id and name are functionally dependant, we can use name among 
expression
-        // even if it is not part of the group by expression.
-        let df_results = collect(physical_plan, ctx.task_ctx()).await?;
+        let df_results = df.collect().await?;
 
         #[rustfmt::skip]
         assert_batches_sorted_eq!(

Reply via email to