vbarua commented on code in PR #12462:
URL: https://github.com/apache/datafusion/pull/12462#discussion_r1759620572


##########
datafusion/substrait/tests/cases/consumer_integration.rs:
##########
@@ -24,569 +24,435 @@
 
 #[cfg(test)]
 mod tests {
+    use crate::utils::test::add_plan_schemas_to_ctx;
     use datafusion::common::Result;
-    use datafusion::execution::options::CsvReadOptions;
     use datafusion::prelude::SessionContext;
     use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
     use std::fs::File;
     use std::io::BufReader;
     use substrait::proto::Plan;
 
-    async fn create_context(files: Vec<(&str, &str)>) -> 
Result<SessionContext> {
-        let ctx = SessionContext::new();
-        for (table_name, file_path) in files {
-            ctx.register_csv(table_name, file_path, CsvReadOptions::default())
-                .await?;
-        }
-        Ok(ctx)
-    }
-    #[tokio::test]
-    async fn tpch_test_1() -> Result<()> {
-        let ctx = create_context(vec![(
-            "FILENAME_PLACEHOLDER_0",
-            "tests/testdata/tpch/lineitem.csv",
-        )])
-        .await?;
-        let path = "tests/testdata/tpch_substrait_plans/query_1.json";
+    async fn tpch_plan_to_string(query_id: i32) -> Result<String> {
+        let path =
+            
format!("tests/testdata/tpch_substrait_plans/query_{query_id:02}_plan.json");
         let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
             File::open(path).expect("file not found"),
         ))
         .expect("failed to parse json");
 
+        let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto)?;
         let plan = from_substrait_plan(&ctx, &proto).await?;
+        Ok(format!("{}", plan))
+    }
 
-        let plan_str = format!("{}", plan);
+    #[tokio::test]
+    async fn tpch_test_01() -> Result<()> {
+        let plan_str = tpch_plan_to_string(1).await?;
         assert_eq!(
             plan_str,
-            "Projection: FILENAME_PLACEHOLDER_0.l_returnflag AS L_RETURNFLAG, 
FILENAME_PLACEHOLDER_0.l_linestatus AS L_LINESTATUS, 
sum(FILENAME_PLACEHOLDER_0.l_quantity) AS SUM_QTY, 
sum(FILENAME_PLACEHOLDER_0.l_extendedprice) AS SUM_BASE_PRICE, 
sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - 
FILENAME_PLACEHOLDER_0.l_discount) AS SUM_DISC_PRICE, 
sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - 
FILENAME_PLACEHOLDER_0.l_discount * Int32(1) + FILENAME_PLACEHOLDER_0.l_tax) AS 
SUM_CHARGE, avg(FILENAME_PLACEHOLDER_0.l_quantity) AS AVG_QTY, 
avg(FILENAME_PLACEHOLDER_0.l_extendedprice) AS AVG_PRICE, 
avg(FILENAME_PLACEHOLDER_0.l_discount) AS AVG_DISC, count(Int64(1)) AS 
COUNT_ORDER\
-             \n  Sort: FILENAME_PLACEHOLDER_0.l_returnflag ASC NULLS LAST, 
FILENAME_PLACEHOLDER_0.l_linestatus ASC NULLS LAST\
-             \n    Aggregate: groupBy=[[FILENAME_PLACEHOLDER_0.l_returnflag, 
FILENAME_PLACEHOLDER_0.l_linestatus]], 
aggr=[[sum(FILENAME_PLACEHOLDER_0.l_quantity), 
sum(FILENAME_PLACEHOLDER_0.l_extendedprice), 
sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - 
FILENAME_PLACEHOLDER_0.l_discount), sum(FILENAME_PLACEHOLDER_0.l_extendedprice 
* Int32(1) - FILENAME_PLACEHOLDER_0.l_discount * Int32(1) + 
FILENAME_PLACEHOLDER_0.l_tax), avg(FILENAME_PLACEHOLDER_0.l_quantity), 
avg(FILENAME_PLACEHOLDER_0.l_extendedprice), 
avg(FILENAME_PLACEHOLDER_0.l_discount), count(Int64(1))]]\
-             \n      Projection: FILENAME_PLACEHOLDER_0.l_returnflag, 
FILENAME_PLACEHOLDER_0.l_linestatus, FILENAME_PLACEHOLDER_0.l_quantity, 
FILENAME_PLACEHOLDER_0.l_extendedprice, FILENAME_PLACEHOLDER_0.l_extendedprice 
* (CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_0.l_discount), 
FILENAME_PLACEHOLDER_0.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) - 
FILENAME_PLACEHOLDER_0.l_discount) * (CAST(Int32(1) AS Decimal128(19, 0)) + 
FILENAME_PLACEHOLDER_0.l_tax), FILENAME_PLACEHOLDER_0.l_discount\
-             \n        Filter: FILENAME_PLACEHOLDER_0.l_shipdate <= 
Date32(\"1998-12-01\") - IntervalDayTime(\"IntervalDayTime { days: 120, 
milliseconds: 0 }\")\
-             \n          TableScan: FILENAME_PLACEHOLDER_0 
projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, 
l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, 
l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]"
+            "Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS, 
sum(LINEITEM.L_QUANTITY) AS SUM_QTY, sum(LINEITEM.L_EXTENDEDPRICE) AS 
SUM_BASE_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT) 
AS SUM_DISC_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - 
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX) AS SUM_CHARGE, 
avg(LINEITEM.L_QUANTITY) AS AVG_QTY, avg(LINEITEM.L_EXTENDEDPRICE) AS 
AVG_PRICE, avg(LINEITEM.L_DISCOUNT) AS AVG_DISC, count(Int64(1)) AS COUNT_ORDER\
+            \n  Sort: LINEITEM.L_RETURNFLAG ASC NULLS LAST, 
LINEITEM.L_LINESTATUS ASC NULLS LAST\
+            \n    Aggregate: groupBy=[[LINEITEM.L_RETURNFLAG, 
LINEITEM.L_LINESTATUS]], aggr=[[sum(LINEITEM.L_QUANTITY), 
sum(LINEITEM.L_EXTENDEDPRICE), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - 
LINEITEM.L_DISCOUNT), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - 
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX), avg(LINEITEM.L_QUANTITY), 
avg(LINEITEM.L_EXTENDEDPRICE), avg(LINEITEM.L_DISCOUNT), count(Int64(1))]]\
+            \n      Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS, 
LINEITEM.L_QUANTITY, LINEITEM.L_EXTENDEDPRICE, LINEITEM.L_EXTENDEDPRICE * 
(CAST(Int32(1) AS Decimal128(15, 2)) - LINEITEM.L_DISCOUNT), 
LINEITEM.L_EXTENDEDPRICE * (CAST(Int32(1) AS Decimal128(15, 2)) - 
LINEITEM.L_DISCOUNT) * (CAST(Int32(1) AS Decimal128(15, 2)) + LINEITEM.L_TAX), 
LINEITEM.L_DISCOUNT\
+            \n        Filter: LINEITEM.L_SHIPDATE <= Date32(\"1998-12-01\") - 
IntervalDayTime(\"IntervalDayTime { days: 0, milliseconds: 10368000 }\")\
+            \n          TableScan: LINEITEM projection=[L_ORDERKEY, L_PARTKEY, 
L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX, 
L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE, 
L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT]"
         );
         Ok(())
     }
 
     #[tokio::test]
-    async fn tpch_test_2() -> Result<()> {
-        let ctx = create_context(vec![
-            ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/part.csv"),
-            ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/supplier.csv"),
-            ("FILENAME_PLACEHOLDER_2", "tests/testdata/tpch/partsupp.csv"),
-            ("FILENAME_PLACEHOLDER_3", "tests/testdata/tpch/nation.csv"),
-            ("FILENAME_PLACEHOLDER_4", "tests/testdata/tpch/region.csv"),
-            ("FILENAME_PLACEHOLDER_5", "tests/testdata/tpch/partsupp.csv"),
-            ("FILENAME_PLACEHOLDER_6", "tests/testdata/tpch/supplier.csv"),
-            ("FILENAME_PLACEHOLDER_7", "tests/testdata/tpch/nation.csv"),
-            ("FILENAME_PLACEHOLDER_8", "tests/testdata/tpch/region.csv"),
-        ])
-        .await?;
-        let path = "tests/testdata/tpch_substrait_plans/query_2.json";
-        let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
-            File::open(path).expect("file not found"),
-        ))
-        .expect("failed to parse json");
-
-        let plan = from_substrait_plan(&ctx, &proto).await?;
-        let plan_str = format!("{}", plan);
+    async fn tpch_test_02() -> Result<()> {
+        let plan_str = tpch_plan_to_string(2).await?;
         assert_eq!(
             plan_str,
-            "Projection: FILENAME_PLACEHOLDER_1.s_acctbal AS S_ACCTBAL, 
FILENAME_PLACEHOLDER_1.s_name AS S_NAME, FILENAME_PLACEHOLDER_3.N_NAME, 
FILENAME_PLACEHOLDER_0.p_partkey AS P_PARTKEY, FILENAME_PLACEHOLDER_0.p_mfgr AS 
P_MFGR, FILENAME_PLACEHOLDER_1.s_address AS S_ADDRESS, 
FILENAME_PLACEHOLDER_1.s_phone AS S_PHONE, FILENAME_PLACEHOLDER_1.s_comment AS 
S_COMMENT\
-            \n  Limit: skip=0, fetch=100\
-            \n    Sort: FILENAME_PLACEHOLDER_1.s_acctbal DESC NULLS FIRST, 
FILENAME_PLACEHOLDER_3.N_NAME ASC NULLS LAST, FILENAME_PLACEHOLDER_1.s_name ASC 
NULLS LAST, FILENAME_PLACEHOLDER_0.p_partkey ASC NULLS LAST\
-            \n      Projection: FILENAME_PLACEHOLDER_1.s_acctbal, 
FILENAME_PLACEHOLDER_1.s_name, FILENAME_PLACEHOLDER_3.N_NAME, 
FILENAME_PLACEHOLDER_0.p_partkey, FILENAME_PLACEHOLDER_0.p_mfgr, 
FILENAME_PLACEHOLDER_1.s_address, FILENAME_PLACEHOLDER_1.s_phone, 
FILENAME_PLACEHOLDER_1.s_comment\
-            \n        Filter: FILENAME_PLACEHOLDER_0.p_partkey = 
FILENAME_PLACEHOLDER_2.ps_partkey AND FILENAME_PLACEHOLDER_1.s_suppkey = 
FILENAME_PLACEHOLDER_2.ps_suppkey AND FILENAME_PLACEHOLDER_0.p_size = Int32(15) 
AND FILENAME_PLACEHOLDER_0.p_type LIKE CAST(Utf8(\"%BRASS\") AS Utf8) AND 
FILENAME_PLACEHOLDER_1.s_nationkey = FILENAME_PLACEHOLDER_3.N_NATIONKEY AND 
FILENAME_PLACEHOLDER_3.N_REGIONKEY = FILENAME_PLACEHOLDER_4.R_REGIONKEY AND 
FILENAME_PLACEHOLDER_4.R_NAME = CAST(Utf8(\"EUROPE\") AS Utf8) AND 
FILENAME_PLACEHOLDER_2.ps_supplycost = (<subquery>)\
-            \n          Subquery:\
-            \n            Aggregate: groupBy=[[]], 
aggr=[[min(FILENAME_PLACEHOLDER_5.ps_supplycost)]]\
-            \n              Projection: FILENAME_PLACEHOLDER_5.ps_supplycost\
-            \n                Filter: FILENAME_PLACEHOLDER_5.ps_partkey = 
FILENAME_PLACEHOLDER_5.ps_partkey AND FILENAME_PLACEHOLDER_6.s_suppkey = 
FILENAME_PLACEHOLDER_5.ps_suppkey AND FILENAME_PLACEHOLDER_6.s_nationkey = 
FILENAME_PLACEHOLDER_7.N_NATIONKEY AND FILENAME_PLACEHOLDER_7.N_REGIONKEY = 
FILENAME_PLACEHOLDER_8.R_REGIONKEY AND FILENAME_PLACEHOLDER_8.R_NAME = 
CAST(Utf8(\"EUROPE\") AS Utf8)\
-            \n                  Inner Join:  Filter: Boolean(true)\

Review Comment:
   Note that the Inner Join with an always true filter has been replaced by an 
explicit CrossJoin, which is equivalent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to