alamb commented on code in PR #12462:
URL: https://github.com/apache/datafusion/pull/12462#discussion_r1760036406
##########
datafusion/substrait/tests/cases/consumer_integration.rs:
##########
@@ -24,569 +24,435 @@
#[cfg(test)]
mod tests {
+ use crate::utils::test::add_plan_schemas_to_ctx;
use datafusion::common::Result;
- use datafusion::execution::options::CsvReadOptions;
use datafusion::prelude::SessionContext;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use std::fs::File;
use std::io::BufReader;
use substrait::proto::Plan;
- async fn create_context(files: Vec<(&str, &str)>) ->
Result<SessionContext> {
- let ctx = SessionContext::new();
- for (table_name, file_path) in files {
- ctx.register_csv(table_name, file_path, CsvReadOptions::default())
- .await?;
- }
- Ok(ctx)
- }
- #[tokio::test]
- async fn tpch_test_1() -> Result<()> {
- let ctx = create_context(vec![(
- "FILENAME_PLACEHOLDER_0",
- "tests/testdata/tpch/lineitem.csv",
- )])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_1.json";
+ async fn tpch_plan_to_string(query_id: i32) -> Result<String> {
+ let path =
+
format!("tests/testdata/tpch_substrait_plans/query_{query_id:02}_plan.json");
let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
File::open(path).expect("file not found"),
))
.expect("failed to parse json");
+ let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto)?;
let plan = from_substrait_plan(&ctx, &proto).await?;
+ Ok(format!("{}", plan))
+ }
- let plan_str = format!("{}", plan);
+ #[tokio::test]
+ async fn tpch_test_01() -> Result<()> {
+ let plan_str = tpch_plan_to_string(1).await?;
assert_eq!(
plan_str,
- "Projection: FILENAME_PLACEHOLDER_0.l_returnflag AS L_RETURNFLAG,
FILENAME_PLACEHOLDER_0.l_linestatus AS L_LINESTATUS,
sum(FILENAME_PLACEHOLDER_0.l_quantity) AS SUM_QTY,
sum(FILENAME_PLACEHOLDER_0.l_extendedprice) AS SUM_BASE_PRICE,
sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_0.l_discount) AS SUM_DISC_PRICE,
sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_0.l_discount * Int32(1) + FILENAME_PLACEHOLDER_0.l_tax) AS
SUM_CHARGE, avg(FILENAME_PLACEHOLDER_0.l_quantity) AS AVG_QTY,
avg(FILENAME_PLACEHOLDER_0.l_extendedprice) AS AVG_PRICE,
avg(FILENAME_PLACEHOLDER_0.l_discount) AS AVG_DISC, count(Int64(1)) AS
COUNT_ORDER\
- \n Sort: FILENAME_PLACEHOLDER_0.l_returnflag ASC NULLS LAST,
FILENAME_PLACEHOLDER_0.l_linestatus ASC NULLS LAST\
- \n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_0.l_returnflag,
FILENAME_PLACEHOLDER_0.l_linestatus]],
aggr=[[sum(FILENAME_PLACEHOLDER_0.l_quantity),
sum(FILENAME_PLACEHOLDER_0.l_extendedprice),
sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_0.l_discount), sum(FILENAME_PLACEHOLDER_0.l_extendedprice
* Int32(1) - FILENAME_PLACEHOLDER_0.l_discount * Int32(1) +
FILENAME_PLACEHOLDER_0.l_tax), avg(FILENAME_PLACEHOLDER_0.l_quantity),
avg(FILENAME_PLACEHOLDER_0.l_extendedprice),
avg(FILENAME_PLACEHOLDER_0.l_discount), count(Int64(1))]]\
- \n Projection: FILENAME_PLACEHOLDER_0.l_returnflag,
FILENAME_PLACEHOLDER_0.l_linestatus, FILENAME_PLACEHOLDER_0.l_quantity,
FILENAME_PLACEHOLDER_0.l_extendedprice, FILENAME_PLACEHOLDER_0.l_extendedprice
* (CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_0.l_discount),
FILENAME_PLACEHOLDER_0.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) -
FILENAME_PLACEHOLDER_0.l_discount) * (CAST(Int32(1) AS Decimal128(19, 0)) +
FILENAME_PLACEHOLDER_0.l_tax), FILENAME_PLACEHOLDER_0.l_discount\
- \n Filter: FILENAME_PLACEHOLDER_0.l_shipdate <=
Date32(\"1998-12-01\") - IntervalDayTime(\"IntervalDayTime { days: 120,
milliseconds: 0 }\")\
- \n TableScan: FILENAME_PLACEHOLDER_0
projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity,
l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate,
l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]"
+ "Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS,
sum(LINEITEM.L_QUANTITY) AS SUM_QTY, sum(LINEITEM.L_EXTENDEDPRICE) AS
SUM_BASE_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT)
AS SUM_DISC_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX) AS SUM_CHARGE,
avg(LINEITEM.L_QUANTITY) AS AVG_QTY, avg(LINEITEM.L_EXTENDEDPRICE) AS
AVG_PRICE, avg(LINEITEM.L_DISCOUNT) AS AVG_DISC, count(Int64(1)) AS COUNT_ORDER\
Review Comment:
those certainly look nicer
##########
datafusion/substrait/tests/utils.rs:
##########
@@ -97,90 +127,362 @@ pub mod test {
},
};
- let substrait_schema = read
- .base_schema
- .as_ref()
- .expect("No base schema found for NamedTable");
+ let substrait_schema =
+ read.base_schema.as_ref().ok_or(substrait_datafusion_err!(
+ "No base schema found for NamedTable: {}",
+ table_reference
+ ))?;
let empty_extensions = Extensions {
functions: Default::default(),
types: Default::default(),
type_variations: Default::default(),
};
let df_schema =
- from_substrait_named_struct(substrait_schema,
&empty_extensions)
- .expect(
- "Unable to generate DataFusion schema from Substrait
NamedStruct",
- )
+ from_substrait_named_struct(substrait_schema,
&empty_extensions)?
.replace_qualifier(table_reference.clone());
let table = EmptyTable::new(df_schema.inner().clone());
self.schemas.push((table_reference, Arc::new(table)));
+ Ok(())
}
- fn collect_schemas_from_rel(&mut self, rel: &Rel) {
- match rel.rel_type.as_ref().unwrap() {
- RelType::Read(r) => match r.read_type.as_ref().unwrap() {
- // Virtual Tables do not contribute to the schema
- ReadType::VirtualTable(_) => (),
- ReadType::LocalFiles(_) => todo!(),
- ReadType::NamedTable(nt) => self.collect_named_table(r,
nt),
- ReadType::ExtensionTable(_) => todo!(),
- },
- RelType::Filter(f) => self.apply(f.input.as_ref().map(|b|
b.as_ref())),
- RelType::Fetch(f) => self.apply(f.input.as_ref().map(|b|
b.as_ref())),
- RelType::Aggregate(a) => self.apply(a.input.as_ref().map(|b|
b.as_ref())),
- RelType::Sort(s) => self.apply(s.input.as_ref().map(|b|
b.as_ref())),
+ fn collect_schemas_from_rel(&mut self, rel: &Rel) -> Result<()> {
Review Comment:
I wonder if
https://docs.rs/datafusion/latest/datafusion/common/tree_node/trait.TreeNode.html
could serve as inspiration. @peter-toth put a lot of effort into improving
that API and I think i is pretty neat now
##########
datafusion/substrait/tests/cases/consumer_integration.rs:
##########
@@ -24,569 +24,435 @@
#[cfg(test)]
mod tests {
+ use crate::utils::test::add_plan_schemas_to_ctx;
use datafusion::common::Result;
- use datafusion::execution::options::CsvReadOptions;
use datafusion::prelude::SessionContext;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use std::fs::File;
use std::io::BufReader;
use substrait::proto::Plan;
- async fn create_context(files: Vec<(&str, &str)>) ->
Result<SessionContext> {
- let ctx = SessionContext::new();
- for (table_name, file_path) in files {
- ctx.register_csv(table_name, file_path, CsvReadOptions::default())
- .await?;
- }
- Ok(ctx)
- }
- #[tokio::test]
- async fn tpch_test_1() -> Result<()> {
- let ctx = create_context(vec![(
- "FILENAME_PLACEHOLDER_0",
- "tests/testdata/tpch/lineitem.csv",
- )])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_1.json";
+ async fn tpch_plan_to_string(query_id: i32) -> Result<String> {
+ let path =
+
format!("tests/testdata/tpch_substrait_plans/query_{query_id:02}_plan.json");
let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
File::open(path).expect("file not found"),
))
.expect("failed to parse json");
+ let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto)?;
let plan = from_substrait_plan(&ctx, &proto).await?;
+ Ok(format!("{}", plan))
+ }
- let plan_str = format!("{}", plan);
+ #[tokio::test]
+ async fn tpch_test_01() -> Result<()> {
+ let plan_str = tpch_plan_to_string(1).await?;
assert_eq!(
plan_str,
- "Projection: FILENAME_PLACEHOLDER_0.l_returnflag AS L_RETURNFLAG,
FILENAME_PLACEHOLDER_0.l_linestatus AS L_LINESTATUS,
sum(FILENAME_PLACEHOLDER_0.l_quantity) AS SUM_QTY,
sum(FILENAME_PLACEHOLDER_0.l_extendedprice) AS SUM_BASE_PRICE,
sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_0.l_discount) AS SUM_DISC_PRICE,
sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_0.l_discount * Int32(1) + FILENAME_PLACEHOLDER_0.l_tax) AS
SUM_CHARGE, avg(FILENAME_PLACEHOLDER_0.l_quantity) AS AVG_QTY,
avg(FILENAME_PLACEHOLDER_0.l_extendedprice) AS AVG_PRICE,
avg(FILENAME_PLACEHOLDER_0.l_discount) AS AVG_DISC, count(Int64(1)) AS
COUNT_ORDER\
- \n Sort: FILENAME_PLACEHOLDER_0.l_returnflag ASC NULLS LAST,
FILENAME_PLACEHOLDER_0.l_linestatus ASC NULLS LAST\
- \n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_0.l_returnflag,
FILENAME_PLACEHOLDER_0.l_linestatus]],
aggr=[[sum(FILENAME_PLACEHOLDER_0.l_quantity),
sum(FILENAME_PLACEHOLDER_0.l_extendedprice),
sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_0.l_discount), sum(FILENAME_PLACEHOLDER_0.l_extendedprice
* Int32(1) - FILENAME_PLACEHOLDER_0.l_discount * Int32(1) +
FILENAME_PLACEHOLDER_0.l_tax), avg(FILENAME_PLACEHOLDER_0.l_quantity),
avg(FILENAME_PLACEHOLDER_0.l_extendedprice),
avg(FILENAME_PLACEHOLDER_0.l_discount), count(Int64(1))]]\
- \n Projection: FILENAME_PLACEHOLDER_0.l_returnflag,
FILENAME_PLACEHOLDER_0.l_linestatus, FILENAME_PLACEHOLDER_0.l_quantity,
FILENAME_PLACEHOLDER_0.l_extendedprice, FILENAME_PLACEHOLDER_0.l_extendedprice
* (CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_0.l_discount),
FILENAME_PLACEHOLDER_0.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) -
FILENAME_PLACEHOLDER_0.l_discount) * (CAST(Int32(1) AS Decimal128(19, 0)) +
FILENAME_PLACEHOLDER_0.l_tax), FILENAME_PLACEHOLDER_0.l_discount\
- \n Filter: FILENAME_PLACEHOLDER_0.l_shipdate <=
Date32(\"1998-12-01\") - IntervalDayTime(\"IntervalDayTime { days: 120,
milliseconds: 0 }\")\
- \n TableScan: FILENAME_PLACEHOLDER_0
projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity,
l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate,
l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]"
+ "Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS,
sum(LINEITEM.L_QUANTITY) AS SUM_QTY, sum(LINEITEM.L_EXTENDEDPRICE) AS
SUM_BASE_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT)
AS SUM_DISC_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX) AS SUM_CHARGE,
avg(LINEITEM.L_QUANTITY) AS AVG_QTY, avg(LINEITEM.L_EXTENDEDPRICE) AS
AVG_PRICE, avg(LINEITEM.L_DISCOUNT) AS AVG_DISC, count(Int64(1)) AS COUNT_ORDER\
+ \n Sort: LINEITEM.L_RETURNFLAG ASC NULLS LAST,
LINEITEM.L_LINESTATUS ASC NULLS LAST\
+ \n Aggregate: groupBy=[[LINEITEM.L_RETURNFLAG,
LINEITEM.L_LINESTATUS]], aggr=[[sum(LINEITEM.L_QUANTITY),
sum(LINEITEM.L_EXTENDEDPRICE), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX), avg(LINEITEM.L_QUANTITY),
avg(LINEITEM.L_EXTENDEDPRICE), avg(LINEITEM.L_DISCOUNT), count(Int64(1))]]\
+ \n Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS,
LINEITEM.L_QUANTITY, LINEITEM.L_EXTENDEDPRICE, LINEITEM.L_EXTENDEDPRICE *
(CAST(Int32(1) AS Decimal128(15, 2)) - LINEITEM.L_DISCOUNT),
LINEITEM.L_EXTENDEDPRICE * (CAST(Int32(1) AS Decimal128(15, 2)) -
LINEITEM.L_DISCOUNT) * (CAST(Int32(1) AS Decimal128(15, 2)) + LINEITEM.L_TAX),
LINEITEM.L_DISCOUNT\
+ \n Filter: LINEITEM.L_SHIPDATE <= Date32(\"1998-12-01\") -
IntervalDayTime(\"IntervalDayTime { days: 0, milliseconds: 10368000 }\")\
+ \n TableScan: LINEITEM projection=[L_ORDERKEY, L_PARTKEY,
L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX,
L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE,
L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT]"
);
Ok(())
}
#[tokio::test]
- async fn tpch_test_2() -> Result<()> {
- let ctx = create_context(vec![
- ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/part.csv"),
- ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/supplier.csv"),
- ("FILENAME_PLACEHOLDER_2", "tests/testdata/tpch/partsupp.csv"),
- ("FILENAME_PLACEHOLDER_3", "tests/testdata/tpch/nation.csv"),
- ("FILENAME_PLACEHOLDER_4", "tests/testdata/tpch/region.csv"),
- ("FILENAME_PLACEHOLDER_5", "tests/testdata/tpch/partsupp.csv"),
- ("FILENAME_PLACEHOLDER_6", "tests/testdata/tpch/supplier.csv"),
- ("FILENAME_PLACEHOLDER_7", "tests/testdata/tpch/nation.csv"),
- ("FILENAME_PLACEHOLDER_8", "tests/testdata/tpch/region.csv"),
- ])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_2.json";
- let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
- File::open(path).expect("file not found"),
- ))
- .expect("failed to parse json");
-
- let plan = from_substrait_plan(&ctx, &proto).await?;
- let plan_str = format!("{}", plan);
+ async fn tpch_test_02() -> Result<()> {
+ let plan_str = tpch_plan_to_string(2).await?;
assert_eq!(
plan_str,
- "Projection: FILENAME_PLACEHOLDER_1.s_acctbal AS S_ACCTBAL,
FILENAME_PLACEHOLDER_1.s_name AS S_NAME, FILENAME_PLACEHOLDER_3.N_NAME,
FILENAME_PLACEHOLDER_0.p_partkey AS P_PARTKEY, FILENAME_PLACEHOLDER_0.p_mfgr AS
P_MFGR, FILENAME_PLACEHOLDER_1.s_address AS S_ADDRESS,
FILENAME_PLACEHOLDER_1.s_phone AS S_PHONE, FILENAME_PLACEHOLDER_1.s_comment AS
S_COMMENT\
- \n Limit: skip=0, fetch=100\
- \n Sort: FILENAME_PLACEHOLDER_1.s_acctbal DESC NULLS FIRST,
FILENAME_PLACEHOLDER_3.N_NAME ASC NULLS LAST, FILENAME_PLACEHOLDER_1.s_name ASC
NULLS LAST, FILENAME_PLACEHOLDER_0.p_partkey ASC NULLS LAST\
- \n Projection: FILENAME_PLACEHOLDER_1.s_acctbal,
FILENAME_PLACEHOLDER_1.s_name, FILENAME_PLACEHOLDER_3.N_NAME,
FILENAME_PLACEHOLDER_0.p_partkey, FILENAME_PLACEHOLDER_0.p_mfgr,
FILENAME_PLACEHOLDER_1.s_address, FILENAME_PLACEHOLDER_1.s_phone,
FILENAME_PLACEHOLDER_1.s_comment\
- \n Filter: FILENAME_PLACEHOLDER_0.p_partkey =
FILENAME_PLACEHOLDER_2.ps_partkey AND FILENAME_PLACEHOLDER_1.s_suppkey =
FILENAME_PLACEHOLDER_2.ps_suppkey AND FILENAME_PLACEHOLDER_0.p_size = Int32(15)
AND FILENAME_PLACEHOLDER_0.p_type LIKE CAST(Utf8(\"%BRASS\") AS Utf8) AND
FILENAME_PLACEHOLDER_1.s_nationkey = FILENAME_PLACEHOLDER_3.N_NATIONKEY AND
FILENAME_PLACEHOLDER_3.N_REGIONKEY = FILENAME_PLACEHOLDER_4.R_REGIONKEY AND
FILENAME_PLACEHOLDER_4.R_NAME = CAST(Utf8(\"EUROPE\") AS Utf8) AND
FILENAME_PLACEHOLDER_2.ps_supplycost = (<subquery>)\
- \n Subquery:\
- \n Aggregate: groupBy=[[]],
aggr=[[min(FILENAME_PLACEHOLDER_5.ps_supplycost)]]\
- \n Projection: FILENAME_PLACEHOLDER_5.ps_supplycost\
- \n Filter: FILENAME_PLACEHOLDER_5.ps_partkey =
FILENAME_PLACEHOLDER_5.ps_partkey AND FILENAME_PLACEHOLDER_6.s_suppkey =
FILENAME_PLACEHOLDER_5.ps_suppkey AND FILENAME_PLACEHOLDER_6.s_nationkey =
FILENAME_PLACEHOLDER_7.N_NATIONKEY AND FILENAME_PLACEHOLDER_7.N_REGIONKEY =
FILENAME_PLACEHOLDER_8.R_REGIONKEY AND FILENAME_PLACEHOLDER_8.R_NAME =
CAST(Utf8(\"EUROPE\") AS Utf8)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n TableScan: FILENAME_PLACEHOLDER_5
projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_6
projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal,
s_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_7
projection=[N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT]\
- \n TableScan: FILENAME_PLACEHOLDER_8
projection=[R_REGIONKEY, R_NAME, R_COMMENT]\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n TableScan: FILENAME_PLACEHOLDER_0
projection=[p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container,
p_retailprice, p_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_1
projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal,
s_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_2
projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_3
projection=[N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT]\
- \n TableScan: FILENAME_PLACEHOLDER_4
projection=[R_REGIONKEY, R_NAME, R_COMMENT]"
+ "Limit: skip=0, fetch=100\
+ \n Sort: SUPPLIER.S_ACCTBAL DESC NULLS FIRST, NATION.N_NAME ASC
NULLS LAST, SUPPLIER.S_NAME ASC NULLS LAST, PART.P_PARTKEY ASC NULLS LAST\
+ \n Projection: SUPPLIER.S_ACCTBAL, SUPPLIER.S_NAME,
NATION.N_NAME, PART.P_PARTKEY, PART.P_MFGR, SUPPLIER.S_ADDRESS,
SUPPLIER.S_PHONE, SUPPLIER.S_COMMENT\
+ \n Filter: PART.P_PARTKEY = PARTSUPP.PS_PARTKEY AND
SUPPLIER.S_SUPPKEY = PARTSUPP.PS_SUPPKEY AND PART.P_SIZE = Int32(15) AND
PART.P_TYPE LIKE CAST(Utf8(\"%BRASS\") AS Utf8) AND SUPPLIER.S_NATIONKEY =
NATION.N_NATIONKEY AND NATION.N_REGIONKEY = REGION.R_REGIONKEY AND
REGION.R_NAME = Utf8(\"EUROPE\") AND PARTSUPP.PS_SUPPLYCOST = (<subquery>)\
+ \n Subquery:\
+ \n Aggregate: groupBy=[[]],
aggr=[[min(PARTSUPP.PS_SUPPLYCOST)]]\
+ \n Projection: PARTSUPP.PS_SUPPLYCOST\
+ \n Filter: PARTSUPP.PS_PARTKEY = PARTSUPP.PS_PARTKEY
AND SUPPLIER.S_SUPPKEY = PARTSUPP.PS_SUPPKEY AND SUPPLIER.S_NATIONKEY =
NATION.N_NATIONKEY AND NATION.N_REGIONKEY = REGION.R_REGIONKEY AND
REGION.R_NAME = Utf8(\"EUROPE\")\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n TableScan: PARTSUPP
projection=[PS_PARTKEY, PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST, PS_COMMENT]\
+ \n TableScan: SUPPLIER projection=[S_SUPPKEY,
S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT]\
+ \n TableScan: NATION projection=[N_NATIONKEY,
N_NAME, N_REGIONKEY, N_COMMENT]\
+ \n TableScan: REGION projection=[R_REGIONKEY,
R_NAME, R_COMMENT]\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n TableScan: PART projection=[P_PARTKEY, P_NAME,
P_MFGR, P_BRAND, P_TYPE, P_SIZE, P_CONTAINER, P_RETAILPRICE, P_COMMENT]\
+ \n TableScan: SUPPLIER projection=[S_SUPPKEY,
S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT]\
+ \n TableScan: PARTSUPP projection=[PS_PARTKEY,
PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST, PS_COMMENT]\
+ \n TableScan: NATION projection=[N_NATIONKEY, N_NAME,
N_REGIONKEY, N_COMMENT]\
+ \n TableScan: REGION projection=[R_REGIONKEY, R_NAME,
R_COMMENT]"
);
Ok(())
}
#[tokio::test]
- async fn tpch_test_3() -> Result<()> {
- let ctx = create_context(vec![
- ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/customer.csv"),
- ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/orders.csv"),
- ("FILENAME_PLACEHOLDER_2", "tests/testdata/tpch/lineitem.csv"),
- ])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_3.json";
- let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
- File::open(path).expect("file not found"),
- ))
- .expect("failed to parse json");
+ async fn tpch_test_03() -> Result<()> {
+ let plan_str = tpch_plan_to_string(3).await?;
+ assert_eq!(
+ plan_str,
+ "Projection: LINEITEM.L_ORDERKEY, sum(LINEITEM.L_EXTENDEDPRICE *
Int32(1) - LINEITEM.L_DISCOUNT) AS REVENUE, ORDERS.O_ORDERDATE,
ORDERS.O_SHIPPRIORITY\
+ \n Limit: skip=0, fetch=10\
+ \n Sort: sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT) DESC NULLS FIRST, ORDERS.O_ORDERDATE ASC NULLS LAST\
+ \n Projection: LINEITEM.L_ORDERKEY,
sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT),
ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY\
+ \n Aggregate: groupBy=[[LINEITEM.L_ORDERKEY,
ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY]],
aggr=[[sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT)]]\
+ \n Projection: LINEITEM.L_ORDERKEY, ORDERS.O_ORDERDATE,
ORDERS.O_SHIPPRIORITY, LINEITEM.L_EXTENDEDPRICE * (CAST(Int32(1) AS
Decimal128(15, 2)) - LINEITEM.L_DISCOUNT)\
+ \n Filter: CUSTOMER.C_MKTSEGMENT = Utf8(\"BUILDING\")
AND CUSTOMER.C_CUSTKEY = ORDERS.O_CUSTKEY AND LINEITEM.L_ORDERKEY =
ORDERS.O_ORDERKEY AND ORDERS.O_ORDERDATE < CAST(Utf8(\"1995-03-15\") AS Date32)
AND LINEITEM.L_SHIPDATE > CAST(Utf8(\"1995-03-15\") AS Date32)\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n TableScan: LINEITEM projection=[L_ORDERKEY,
L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT,
L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE,
L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT]\
+ \n TableScan: CUSTOMER projection=[C_CUSTKEY,
C_NAME, C_ADDRESS, C_NATIONKEY, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT]\
+ \n TableScan: ORDERS projection=[O_ORDERKEY,
O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK,
O_SHIPPRIORITY, O_COMMENT]"
+ );
+ Ok(())
+ }
- let plan = from_substrait_plan(&ctx, &proto).await?;
- let plan_str = format!("{}", plan);
- assert_eq!(plan_str, "Projection: FILENAME_PLACEHOLDER_2.l_orderkey AS
L_ORDERKEY, sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount) AS REVENUE,
FILENAME_PLACEHOLDER_1.o_orderdate AS O_ORDERDATE,
FILENAME_PLACEHOLDER_1.o_shippriority AS O_SHIPPRIORITY\
- \n Limit: skip=0, fetch=10\
- \n Sort: sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount) DESC NULLS FIRST,
FILENAME_PLACEHOLDER_1.o_orderdate ASC NULLS LAST\
- \n Projection: FILENAME_PLACEHOLDER_2.l_orderkey,
sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount), FILENAME_PLACEHOLDER_1.o_orderdate,
FILENAME_PLACEHOLDER_1.o_shippriority\
- \n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_2.l_orderkey,
FILENAME_PLACEHOLDER_1.o_orderdate, FILENAME_PLACEHOLDER_1.o_shippriority]],
aggr=[[sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount)]]\
- \n Projection: FILENAME_PLACEHOLDER_2.l_orderkey,
FILENAME_PLACEHOLDER_1.o_orderdate, FILENAME_PLACEHOLDER_1.o_shippriority,
FILENAME_PLACEHOLDER_2.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) -
FILENAME_PLACEHOLDER_2.l_discount)\
- \n Filter: FILENAME_PLACEHOLDER_0.c_mktsegment =
CAST(Utf8(\"HOUSEHOLD\") AS Utf8) AND FILENAME_PLACEHOLDER_0.c_custkey =
FILENAME_PLACEHOLDER_1.o_custkey AND FILENAME_PLACEHOLDER_2.l_orderkey =
FILENAME_PLACEHOLDER_1.o_orderkey AND FILENAME_PLACEHOLDER_1.o_orderdate <
Date32(\"1995-03-25\") AND FILENAME_PLACEHOLDER_2.l_shipdate >
Date32(\"1995-03-25\")\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n TableScan: FILENAME_PLACEHOLDER_0
projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal,
c_mktsegment, c_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_1
projection=[o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate,
o_orderpriority, o_clerk, o_shippriority, o_comment]\n
TableScan: FILENAME_PLACEHOLDER_2 projection=[l_orderkey, l_partkey, l_suppkey,
l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,
l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct,
l_shipmode, l_comment]");
+ #[tokio::test]
+ async fn tpch_test_04() -> Result<()> {
+ let plan_str = tpch_plan_to_string(4).await?;
+ assert_eq!(
+ plan_str,
+ "Projection: ORDERS.O_ORDERPRIORITY, count(Int64(1)) AS
ORDER_COUNT\
+ \n Sort: ORDERS.O_ORDERPRIORITY ASC NULLS LAST\
+ \n Aggregate: groupBy=[[ORDERS.O_ORDERPRIORITY]],
aggr=[[count(Int64(1))]]\
+ \n Projection: ORDERS.O_ORDERPRIORITY\
+ \n Filter: ORDERS.O_ORDERDATE >= CAST(Utf8(\"1993-07-01\")
AS Date32) AND ORDERS.O_ORDERDATE < CAST(Utf8(\"1993-10-01\") AS Date32) AND
EXISTS (<subquery>)\
+ \n Subquery:\
+ \n Filter: LINEITEM.L_ORDERKEY = LINEITEM.L_ORDERKEY
AND LINEITEM.L_COMMITDATE < LINEITEM.L_RECEIPTDATE\
+ \n TableScan: LINEITEM projection=[L_ORDERKEY,
L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT,
L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE,
L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT]\
+ \n TableScan: ORDERS projection=[O_ORDERKEY, O_CUSTKEY,
O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK,
O_SHIPPRIORITY, O_COMMENT]"
+ );
Ok(())
}
#[tokio::test]
- async fn tpch_test_4() -> Result<()> {
- let ctx = create_context(vec![
- ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/orders.csv"),
- ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/lineitem.csv"),
- ])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_4.json";
- let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
- File::open(path).expect("file not found"),
- ))
- .expect("failed to parse json");
- let plan = from_substrait_plan(&ctx, &proto).await?;
- let plan_str = format!("{}", plan);
- assert_eq!(plan_str, "Projection:
FILENAME_PLACEHOLDER_0.o_orderpriority AS O_ORDERPRIORITY, count(Int64(1)) AS
ORDER_COUNT\
- \n Sort: FILENAME_PLACEHOLDER_0.o_orderpriority ASC NULLS LAST\
- \n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_0.o_orderpriority]],
aggr=[[count(Int64(1))]]\
- \n Projection: FILENAME_PLACEHOLDER_0.o_orderpriority\
- \n Filter: FILENAME_PLACEHOLDER_0.o_orderdate >=
CAST(Utf8(\"1993-07-01\") AS Date32) AND FILENAME_PLACEHOLDER_0.o_orderdate <
CAST(Utf8(\"1993-10-01\") AS Date32) AND EXISTS (<subquery>)\
- \n Subquery:\
- \n Filter: FILENAME_PLACEHOLDER_1.l_orderkey =
FILENAME_PLACEHOLDER_1.l_orderkey AND FILENAME_PLACEHOLDER_1.l_commitdate <
FILENAME_PLACEHOLDER_1.l_receiptdate\
- \n TableScan: FILENAME_PLACEHOLDER_1
projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity,
l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate,
l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_0 projection=[o_orderkey,
o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk,
o_shippriority, o_comment]");
+ async fn tpch_test_05() -> Result<()> {
+ let plan_str = tpch_plan_to_string(5).await?;
+ assert_eq!(
+ plan_str,
+ "Projection: NATION.N_NAME, sum(LINEITEM.L_EXTENDEDPRICE *
Int32(1) - LINEITEM.L_DISCOUNT) AS REVENUE\
+ \n Sort: sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT) DESC NULLS FIRST\
+ \n Aggregate: groupBy=[[NATION.N_NAME]],
aggr=[[sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT)]]\
+ \n Projection: NATION.N_NAME, LINEITEM.L_EXTENDEDPRICE *
(CAST(Int32(1) AS Decimal128(15, 2)) - LINEITEM.L_DISCOUNT)\
+ \n Filter: CUSTOMER.C_CUSTKEY = ORDERS.O_CUSTKEY AND
LINEITEM.L_ORDERKEY = ORDERS.O_ORDERKEY AND LINEITEM.L_SUPPKEY =
SUPPLIER.S_SUPPKEY AND CUSTOMER.C_NATIONKEY = SUPPLIER.S_NATIONKEY AND
SUPPLIER.S_NATIONKEY = NATION.N_NATIONKEY AND NATION.N_REGIONKEY =
REGION.R_REGIONKEY AND REGION.R_NAME = Utf8(\"ASIA\") AND ORDERS.O_ORDERDATE >=
CAST(Utf8(\"1994-01-01\") AS Date32) AND ORDERS.O_ORDERDATE <
CAST(Utf8(\"1995-01-01\") AS Date32)\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n TableScan: CUSTOMER projection=[C_CUSTKEY,
C_NAME, C_ADDRESS, C_NATIONKEY, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT]\
+ \n TableScan: ORDERS projection=[O_ORDERKEY,
O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK,
O_SHIPPRIORITY, O_COMMENT]\
+ \n TableScan: LINEITEM projection=[L_ORDERKEY,
L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT,
L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE,
L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT]\
+ \n TableScan: SUPPLIER projection=[S_SUPPKEY,
S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT]\
+ \n TableScan: NATION projection=[N_NATIONKEY, N_NAME,
N_REGIONKEY, N_COMMENT]\
+ \n TableScan: REGION projection=[R_REGIONKEY, R_NAME,
R_COMMENT]"
+ );
Ok(())
}
#[tokio::test]
- async fn tpch_test_5() -> Result<()> {
- let ctx = create_context(vec![
- ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/customer.csv"),
- ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/orders.csv"),
- ("FILENAME_PLACEHOLDER_2", "tests/testdata/tpch/lineitem.csv"),
- ("FILENAME_PLACEHOLDER_3", "tests/testdata/tpch/supplier.csv"),
- ("NATION", "tests/testdata/tpch/nation.csv"),
- ("REGION", "tests/testdata/tpch/region.csv"),
- ])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_5.json";
- let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
- File::open(path).expect("file not found"),
- ))
- .expect("failed to parse json");
+ async fn tpch_test_06() -> Result<()> {
+ let plan_str = tpch_plan_to_string(6).await?;
+ assert_eq!(
+ plan_str,
+ "Aggregate: groupBy=[[]], aggr=[[sum(LINEITEM.L_EXTENDEDPRICE *
LINEITEM.L_DISCOUNT) AS REVENUE]]\
+ \n Projection: LINEITEM.L_EXTENDEDPRICE * LINEITEM.L_DISCOUNT\
+ \n Filter: LINEITEM.L_SHIPDATE >= CAST(Utf8(\"1994-01-01\") AS
Date32) AND LINEITEM.L_SHIPDATE < CAST(Utf8(\"1995-01-01\") AS Date32) AND
LINEITEM.L_DISCOUNT >= Decimal128(Some(5),3,2) AND LINEITEM.L_DISCOUNT <=
Decimal128(Some(7),3,2) AND LINEITEM.L_QUANTITY < CAST(Int32(24) AS
Decimal128(15, 2))\
+ \n TableScan: LINEITEM projection=[L_ORDERKEY, L_PARTKEY,
L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX,
L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE,
L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT]"
+ );
+ Ok(())
+ }
- let plan = from_substrait_plan(&ctx, &proto).await?;
- let plan_str = format!("{}", plan);
- assert_eq!(plan_str, "Projection: NATION.N_NAME,
sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount) AS REVENUE\
- \n Sort: sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount) DESC NULLS FIRST\
- \n Aggregate: groupBy=[[NATION.N_NAME]],
aggr=[[sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount)]]\
- \n Projection: NATION.N_NAME,
FILENAME_PLACEHOLDER_2.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) -
FILENAME_PLACEHOLDER_2.l_discount)\
- \n Filter: FILENAME_PLACEHOLDER_0.c_custkey =
FILENAME_PLACEHOLDER_1.o_custkey AND FILENAME_PLACEHOLDER_2.l_orderkey =
FILENAME_PLACEHOLDER_1.o_orderkey AND FILENAME_PLACEHOLDER_2.l_suppkey =
FILENAME_PLACEHOLDER_3.s_suppkey AND FILENAME_PLACEHOLDER_0.c_nationkey =
FILENAME_PLACEHOLDER_3.s_nationkey AND FILENAME_PLACEHOLDER_3.s_nationkey =
NATION.N_NATIONKEY AND NATION.N_REGIONKEY = REGION.R_REGIONKEY AND
REGION.R_NAME = CAST(Utf8(\"ASIA\") AS Utf8) AND
FILENAME_PLACEHOLDER_1.o_orderdate >= CAST(Utf8(\"1994-01-01\") AS Date32) AND
FILENAME_PLACEHOLDER_1.o_orderdate < CAST(Utf8(\"1995-01-01\") AS Date32)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n TableScan: FILENAME_PLACEHOLDER_0
projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal,
c_mktsegment, c_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_1
projection=[o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate,
o_orderpriority, o_clerk, o_shippriority, o_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_2
projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity,
l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate,
l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_3
projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal,
s_comment]\
- \n TableScan: NATION projection=[N_NATIONKEY, N_NAME,
N_REGIONKEY, N_COMMENT]\
- \n TableScan: REGION projection=[R_REGIONKEY, R_NAME,
R_COMMENT]");
+ #[ignore]
+ #[tokio::test]
Review Comment:
for other reviewers, these plans weren't included in the initial coverage
either
##########
datafusion/substrait/tests/cases/consumer_integration.rs:
##########
@@ -24,569 +24,435 @@
#[cfg(test)]
mod tests {
+ use crate::utils::test::add_plan_schemas_to_ctx;
use datafusion::common::Result;
- use datafusion::execution::options::CsvReadOptions;
use datafusion::prelude::SessionContext;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use std::fs::File;
use std::io::BufReader;
use substrait::proto::Plan;
- async fn create_context(files: Vec<(&str, &str)>) ->
Result<SessionContext> {
- let ctx = SessionContext::new();
- for (table_name, file_path) in files {
- ctx.register_csv(table_name, file_path, CsvReadOptions::default())
- .await?;
- }
- Ok(ctx)
- }
- #[tokio::test]
- async fn tpch_test_1() -> Result<()> {
- let ctx = create_context(vec![(
- "FILENAME_PLACEHOLDER_0",
- "tests/testdata/tpch/lineitem.csv",
- )])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_1.json";
+ async fn tpch_plan_to_string(query_id: i32) -> Result<String> {
+ let path =
+
format!("tests/testdata/tpch_substrait_plans/query_{query_id:02}_plan.json");
let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
File::open(path).expect("file not found"),
))
.expect("failed to parse json");
+ let ctx = add_plan_schemas_to_ctx(SessionContext::new(), &proto)?;
let plan = from_substrait_plan(&ctx, &proto).await?;
+ Ok(format!("{}", plan))
+ }
- let plan_str = format!("{}", plan);
+ #[tokio::test]
+ async fn tpch_test_01() -> Result<()> {
+ let plan_str = tpch_plan_to_string(1).await?;
assert_eq!(
plan_str,
- "Projection: FILENAME_PLACEHOLDER_0.l_returnflag AS L_RETURNFLAG,
FILENAME_PLACEHOLDER_0.l_linestatus AS L_LINESTATUS,
sum(FILENAME_PLACEHOLDER_0.l_quantity) AS SUM_QTY,
sum(FILENAME_PLACEHOLDER_0.l_extendedprice) AS SUM_BASE_PRICE,
sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_0.l_discount) AS SUM_DISC_PRICE,
sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_0.l_discount * Int32(1) + FILENAME_PLACEHOLDER_0.l_tax) AS
SUM_CHARGE, avg(FILENAME_PLACEHOLDER_0.l_quantity) AS AVG_QTY,
avg(FILENAME_PLACEHOLDER_0.l_extendedprice) AS AVG_PRICE,
avg(FILENAME_PLACEHOLDER_0.l_discount) AS AVG_DISC, count(Int64(1)) AS
COUNT_ORDER\
- \n Sort: FILENAME_PLACEHOLDER_0.l_returnflag ASC NULLS LAST,
FILENAME_PLACEHOLDER_0.l_linestatus ASC NULLS LAST\
- \n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_0.l_returnflag,
FILENAME_PLACEHOLDER_0.l_linestatus]],
aggr=[[sum(FILENAME_PLACEHOLDER_0.l_quantity),
sum(FILENAME_PLACEHOLDER_0.l_extendedprice),
sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_0.l_discount), sum(FILENAME_PLACEHOLDER_0.l_extendedprice
* Int32(1) - FILENAME_PLACEHOLDER_0.l_discount * Int32(1) +
FILENAME_PLACEHOLDER_0.l_tax), avg(FILENAME_PLACEHOLDER_0.l_quantity),
avg(FILENAME_PLACEHOLDER_0.l_extendedprice),
avg(FILENAME_PLACEHOLDER_0.l_discount), count(Int64(1))]]\
- \n Projection: FILENAME_PLACEHOLDER_0.l_returnflag,
FILENAME_PLACEHOLDER_0.l_linestatus, FILENAME_PLACEHOLDER_0.l_quantity,
FILENAME_PLACEHOLDER_0.l_extendedprice, FILENAME_PLACEHOLDER_0.l_extendedprice
* (CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_0.l_discount),
FILENAME_PLACEHOLDER_0.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) -
FILENAME_PLACEHOLDER_0.l_discount) * (CAST(Int32(1) AS Decimal128(19, 0)) +
FILENAME_PLACEHOLDER_0.l_tax), FILENAME_PLACEHOLDER_0.l_discount\
- \n Filter: FILENAME_PLACEHOLDER_0.l_shipdate <=
Date32(\"1998-12-01\") - IntervalDayTime(\"IntervalDayTime { days: 120,
milliseconds: 0 }\")\
- \n TableScan: FILENAME_PLACEHOLDER_0
projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity,
l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate,
l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]"
+ "Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS,
sum(LINEITEM.L_QUANTITY) AS SUM_QTY, sum(LINEITEM.L_EXTENDEDPRICE) AS
SUM_BASE_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT)
AS SUM_DISC_PRICE, sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX) AS SUM_CHARGE,
avg(LINEITEM.L_QUANTITY) AS AVG_QTY, avg(LINEITEM.L_EXTENDEDPRICE) AS
AVG_PRICE, avg(LINEITEM.L_DISCOUNT) AS AVG_DISC, count(Int64(1)) AS COUNT_ORDER\
+ \n Sort: LINEITEM.L_RETURNFLAG ASC NULLS LAST,
LINEITEM.L_LINESTATUS ASC NULLS LAST\
+ \n Aggregate: groupBy=[[LINEITEM.L_RETURNFLAG,
LINEITEM.L_LINESTATUS]], aggr=[[sum(LINEITEM.L_QUANTITY),
sum(LINEITEM.L_EXTENDEDPRICE), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT * Int32(1) + LINEITEM.L_TAX), avg(LINEITEM.L_QUANTITY),
avg(LINEITEM.L_EXTENDEDPRICE), avg(LINEITEM.L_DISCOUNT), count(Int64(1))]]\
+ \n Projection: LINEITEM.L_RETURNFLAG, LINEITEM.L_LINESTATUS,
LINEITEM.L_QUANTITY, LINEITEM.L_EXTENDEDPRICE, LINEITEM.L_EXTENDEDPRICE *
(CAST(Int32(1) AS Decimal128(15, 2)) - LINEITEM.L_DISCOUNT),
LINEITEM.L_EXTENDEDPRICE * (CAST(Int32(1) AS Decimal128(15, 2)) -
LINEITEM.L_DISCOUNT) * (CAST(Int32(1) AS Decimal128(15, 2)) + LINEITEM.L_TAX),
LINEITEM.L_DISCOUNT\
+ \n Filter: LINEITEM.L_SHIPDATE <= Date32(\"1998-12-01\") -
IntervalDayTime(\"IntervalDayTime { days: 0, milliseconds: 10368000 }\")\
+ \n TableScan: LINEITEM projection=[L_ORDERKEY, L_PARTKEY,
L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX,
L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE,
L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT]"
);
Ok(())
}
#[tokio::test]
- async fn tpch_test_2() -> Result<()> {
- let ctx = create_context(vec![
- ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/part.csv"),
- ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/supplier.csv"),
- ("FILENAME_PLACEHOLDER_2", "tests/testdata/tpch/partsupp.csv"),
- ("FILENAME_PLACEHOLDER_3", "tests/testdata/tpch/nation.csv"),
- ("FILENAME_PLACEHOLDER_4", "tests/testdata/tpch/region.csv"),
- ("FILENAME_PLACEHOLDER_5", "tests/testdata/tpch/partsupp.csv"),
- ("FILENAME_PLACEHOLDER_6", "tests/testdata/tpch/supplier.csv"),
- ("FILENAME_PLACEHOLDER_7", "tests/testdata/tpch/nation.csv"),
- ("FILENAME_PLACEHOLDER_8", "tests/testdata/tpch/region.csv"),
- ])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_2.json";
- let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
- File::open(path).expect("file not found"),
- ))
- .expect("failed to parse json");
-
- let plan = from_substrait_plan(&ctx, &proto).await?;
- let plan_str = format!("{}", plan);
+ async fn tpch_test_02() -> Result<()> {
+ let plan_str = tpch_plan_to_string(2).await?;
assert_eq!(
plan_str,
- "Projection: FILENAME_PLACEHOLDER_1.s_acctbal AS S_ACCTBAL,
FILENAME_PLACEHOLDER_1.s_name AS S_NAME, FILENAME_PLACEHOLDER_3.N_NAME,
FILENAME_PLACEHOLDER_0.p_partkey AS P_PARTKEY, FILENAME_PLACEHOLDER_0.p_mfgr AS
P_MFGR, FILENAME_PLACEHOLDER_1.s_address AS S_ADDRESS,
FILENAME_PLACEHOLDER_1.s_phone AS S_PHONE, FILENAME_PLACEHOLDER_1.s_comment AS
S_COMMENT\
- \n Limit: skip=0, fetch=100\
- \n Sort: FILENAME_PLACEHOLDER_1.s_acctbal DESC NULLS FIRST,
FILENAME_PLACEHOLDER_3.N_NAME ASC NULLS LAST, FILENAME_PLACEHOLDER_1.s_name ASC
NULLS LAST, FILENAME_PLACEHOLDER_0.p_partkey ASC NULLS LAST\
- \n Projection: FILENAME_PLACEHOLDER_1.s_acctbal,
FILENAME_PLACEHOLDER_1.s_name, FILENAME_PLACEHOLDER_3.N_NAME,
FILENAME_PLACEHOLDER_0.p_partkey, FILENAME_PLACEHOLDER_0.p_mfgr,
FILENAME_PLACEHOLDER_1.s_address, FILENAME_PLACEHOLDER_1.s_phone,
FILENAME_PLACEHOLDER_1.s_comment\
- \n Filter: FILENAME_PLACEHOLDER_0.p_partkey =
FILENAME_PLACEHOLDER_2.ps_partkey AND FILENAME_PLACEHOLDER_1.s_suppkey =
FILENAME_PLACEHOLDER_2.ps_suppkey AND FILENAME_PLACEHOLDER_0.p_size = Int32(15)
AND FILENAME_PLACEHOLDER_0.p_type LIKE CAST(Utf8(\"%BRASS\") AS Utf8) AND
FILENAME_PLACEHOLDER_1.s_nationkey = FILENAME_PLACEHOLDER_3.N_NATIONKEY AND
FILENAME_PLACEHOLDER_3.N_REGIONKEY = FILENAME_PLACEHOLDER_4.R_REGIONKEY AND
FILENAME_PLACEHOLDER_4.R_NAME = CAST(Utf8(\"EUROPE\") AS Utf8) AND
FILENAME_PLACEHOLDER_2.ps_supplycost = (<subquery>)\
- \n Subquery:\
- \n Aggregate: groupBy=[[]],
aggr=[[min(FILENAME_PLACEHOLDER_5.ps_supplycost)]]\
- \n Projection: FILENAME_PLACEHOLDER_5.ps_supplycost\
- \n Filter: FILENAME_PLACEHOLDER_5.ps_partkey =
FILENAME_PLACEHOLDER_5.ps_partkey AND FILENAME_PLACEHOLDER_6.s_suppkey =
FILENAME_PLACEHOLDER_5.ps_suppkey AND FILENAME_PLACEHOLDER_6.s_nationkey =
FILENAME_PLACEHOLDER_7.N_NATIONKEY AND FILENAME_PLACEHOLDER_7.N_REGIONKEY =
FILENAME_PLACEHOLDER_8.R_REGIONKEY AND FILENAME_PLACEHOLDER_8.R_NAME =
CAST(Utf8(\"EUROPE\") AS Utf8)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n TableScan: FILENAME_PLACEHOLDER_5
projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_6
projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal,
s_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_7
projection=[N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT]\
- \n TableScan: FILENAME_PLACEHOLDER_8
projection=[R_REGIONKEY, R_NAME, R_COMMENT]\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n TableScan: FILENAME_PLACEHOLDER_0
projection=[p_partkey, p_name, p_mfgr, p_brand, p_type, p_size, p_container,
p_retailprice, p_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_1
projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal,
s_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_2
projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_3
projection=[N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT]\
- \n TableScan: FILENAME_PLACEHOLDER_4
projection=[R_REGIONKEY, R_NAME, R_COMMENT]"
+ "Limit: skip=0, fetch=100\
+ \n Sort: SUPPLIER.S_ACCTBAL DESC NULLS FIRST, NATION.N_NAME ASC
NULLS LAST, SUPPLIER.S_NAME ASC NULLS LAST, PART.P_PARTKEY ASC NULLS LAST\
+ \n Projection: SUPPLIER.S_ACCTBAL, SUPPLIER.S_NAME,
NATION.N_NAME, PART.P_PARTKEY, PART.P_MFGR, SUPPLIER.S_ADDRESS,
SUPPLIER.S_PHONE, SUPPLIER.S_COMMENT\
+ \n Filter: PART.P_PARTKEY = PARTSUPP.PS_PARTKEY AND
SUPPLIER.S_SUPPKEY = PARTSUPP.PS_SUPPKEY AND PART.P_SIZE = Int32(15) AND
PART.P_TYPE LIKE CAST(Utf8(\"%BRASS\") AS Utf8) AND SUPPLIER.S_NATIONKEY =
NATION.N_NATIONKEY AND NATION.N_REGIONKEY = REGION.R_REGIONKEY AND
REGION.R_NAME = Utf8(\"EUROPE\") AND PARTSUPP.PS_SUPPLYCOST = (<subquery>)\
+ \n Subquery:\
+ \n Aggregate: groupBy=[[]],
aggr=[[min(PARTSUPP.PS_SUPPLYCOST)]]\
+ \n Projection: PARTSUPP.PS_SUPPLYCOST\
+ \n Filter: PARTSUPP.PS_PARTKEY = PARTSUPP.PS_PARTKEY
AND SUPPLIER.S_SUPPKEY = PARTSUPP.PS_SUPPKEY AND SUPPLIER.S_NATIONKEY =
NATION.N_NATIONKEY AND NATION.N_REGIONKEY = REGION.R_REGIONKEY AND
REGION.R_NAME = Utf8(\"EUROPE\")\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n TableScan: PARTSUPP
projection=[PS_PARTKEY, PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST, PS_COMMENT]\
+ \n TableScan: SUPPLIER projection=[S_SUPPKEY,
S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT]\
+ \n TableScan: NATION projection=[N_NATIONKEY,
N_NAME, N_REGIONKEY, N_COMMENT]\
+ \n TableScan: REGION projection=[R_REGIONKEY,
R_NAME, R_COMMENT]\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n TableScan: PART projection=[P_PARTKEY, P_NAME,
P_MFGR, P_BRAND, P_TYPE, P_SIZE, P_CONTAINER, P_RETAILPRICE, P_COMMENT]\
+ \n TableScan: SUPPLIER projection=[S_SUPPKEY,
S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT]\
+ \n TableScan: PARTSUPP projection=[PS_PARTKEY,
PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST, PS_COMMENT]\
+ \n TableScan: NATION projection=[N_NATIONKEY, N_NAME,
N_REGIONKEY, N_COMMENT]\
+ \n TableScan: REGION projection=[R_REGIONKEY, R_NAME,
R_COMMENT]"
);
Ok(())
}
#[tokio::test]
- async fn tpch_test_3() -> Result<()> {
- let ctx = create_context(vec![
- ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/customer.csv"),
- ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/orders.csv"),
- ("FILENAME_PLACEHOLDER_2", "tests/testdata/tpch/lineitem.csv"),
- ])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_3.json";
- let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
- File::open(path).expect("file not found"),
- ))
- .expect("failed to parse json");
+ async fn tpch_test_03() -> Result<()> {
+ let plan_str = tpch_plan_to_string(3).await?;
+ assert_eq!(
+ plan_str,
+ "Projection: LINEITEM.L_ORDERKEY, sum(LINEITEM.L_EXTENDEDPRICE *
Int32(1) - LINEITEM.L_DISCOUNT) AS REVENUE, ORDERS.O_ORDERDATE,
ORDERS.O_SHIPPRIORITY\
+ \n Limit: skip=0, fetch=10\
+ \n Sort: sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT) DESC NULLS FIRST, ORDERS.O_ORDERDATE ASC NULLS LAST\
+ \n Projection: LINEITEM.L_ORDERKEY,
sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT),
ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY\
+ \n Aggregate: groupBy=[[LINEITEM.L_ORDERKEY,
ORDERS.O_ORDERDATE, ORDERS.O_SHIPPRIORITY]],
aggr=[[sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT)]]\
+ \n Projection: LINEITEM.L_ORDERKEY, ORDERS.O_ORDERDATE,
ORDERS.O_SHIPPRIORITY, LINEITEM.L_EXTENDEDPRICE * (CAST(Int32(1) AS
Decimal128(15, 2)) - LINEITEM.L_DISCOUNT)\
+ \n Filter: CUSTOMER.C_MKTSEGMENT = Utf8(\"BUILDING\")
AND CUSTOMER.C_CUSTKEY = ORDERS.O_CUSTKEY AND LINEITEM.L_ORDERKEY =
ORDERS.O_ORDERKEY AND ORDERS.O_ORDERDATE < CAST(Utf8(\"1995-03-15\") AS Date32)
AND LINEITEM.L_SHIPDATE > CAST(Utf8(\"1995-03-15\") AS Date32)\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n TableScan: LINEITEM projection=[L_ORDERKEY,
L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT,
L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE,
L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT]\
+ \n TableScan: CUSTOMER projection=[C_CUSTKEY,
C_NAME, C_ADDRESS, C_NATIONKEY, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT]\
+ \n TableScan: ORDERS projection=[O_ORDERKEY,
O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK,
O_SHIPPRIORITY, O_COMMENT]"
+ );
+ Ok(())
+ }
- let plan = from_substrait_plan(&ctx, &proto).await?;
- let plan_str = format!("{}", plan);
- assert_eq!(plan_str, "Projection: FILENAME_PLACEHOLDER_2.l_orderkey AS
L_ORDERKEY, sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount) AS REVENUE,
FILENAME_PLACEHOLDER_1.o_orderdate AS O_ORDERDATE,
FILENAME_PLACEHOLDER_1.o_shippriority AS O_SHIPPRIORITY\
- \n Limit: skip=0, fetch=10\
- \n Sort: sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount) DESC NULLS FIRST,
FILENAME_PLACEHOLDER_1.o_orderdate ASC NULLS LAST\
- \n Projection: FILENAME_PLACEHOLDER_2.l_orderkey,
sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount), FILENAME_PLACEHOLDER_1.o_orderdate,
FILENAME_PLACEHOLDER_1.o_shippriority\
- \n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_2.l_orderkey,
FILENAME_PLACEHOLDER_1.o_orderdate, FILENAME_PLACEHOLDER_1.o_shippriority]],
aggr=[[sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount)]]\
- \n Projection: FILENAME_PLACEHOLDER_2.l_orderkey,
FILENAME_PLACEHOLDER_1.o_orderdate, FILENAME_PLACEHOLDER_1.o_shippriority,
FILENAME_PLACEHOLDER_2.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) -
FILENAME_PLACEHOLDER_2.l_discount)\
- \n Filter: FILENAME_PLACEHOLDER_0.c_mktsegment =
CAST(Utf8(\"HOUSEHOLD\") AS Utf8) AND FILENAME_PLACEHOLDER_0.c_custkey =
FILENAME_PLACEHOLDER_1.o_custkey AND FILENAME_PLACEHOLDER_2.l_orderkey =
FILENAME_PLACEHOLDER_1.o_orderkey AND FILENAME_PLACEHOLDER_1.o_orderdate <
Date32(\"1995-03-25\") AND FILENAME_PLACEHOLDER_2.l_shipdate >
Date32(\"1995-03-25\")\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n TableScan: FILENAME_PLACEHOLDER_0
projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal,
c_mktsegment, c_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_1
projection=[o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate,
o_orderpriority, o_clerk, o_shippriority, o_comment]\n
TableScan: FILENAME_PLACEHOLDER_2 projection=[l_orderkey, l_partkey, l_suppkey,
l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,
l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct,
l_shipmode, l_comment]");
+ #[tokio::test]
+ async fn tpch_test_04() -> Result<()> {
+ let plan_str = tpch_plan_to_string(4).await?;
+ assert_eq!(
+ plan_str,
+ "Projection: ORDERS.O_ORDERPRIORITY, count(Int64(1)) AS
ORDER_COUNT\
+ \n Sort: ORDERS.O_ORDERPRIORITY ASC NULLS LAST\
+ \n Aggregate: groupBy=[[ORDERS.O_ORDERPRIORITY]],
aggr=[[count(Int64(1))]]\
+ \n Projection: ORDERS.O_ORDERPRIORITY\
+ \n Filter: ORDERS.O_ORDERDATE >= CAST(Utf8(\"1993-07-01\")
AS Date32) AND ORDERS.O_ORDERDATE < CAST(Utf8(\"1993-10-01\") AS Date32) AND
EXISTS (<subquery>)\
+ \n Subquery:\
+ \n Filter: LINEITEM.L_ORDERKEY = LINEITEM.L_ORDERKEY
AND LINEITEM.L_COMMITDATE < LINEITEM.L_RECEIPTDATE\
+ \n TableScan: LINEITEM projection=[L_ORDERKEY,
L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT,
L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE,
L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT]\
+ \n TableScan: ORDERS projection=[O_ORDERKEY, O_CUSTKEY,
O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK,
O_SHIPPRIORITY, O_COMMENT]"
+ );
Ok(())
}
#[tokio::test]
- async fn tpch_test_4() -> Result<()> {
- let ctx = create_context(vec![
- ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/orders.csv"),
- ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/lineitem.csv"),
- ])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_4.json";
- let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
- File::open(path).expect("file not found"),
- ))
- .expect("failed to parse json");
- let plan = from_substrait_plan(&ctx, &proto).await?;
- let plan_str = format!("{}", plan);
- assert_eq!(plan_str, "Projection:
FILENAME_PLACEHOLDER_0.o_orderpriority AS O_ORDERPRIORITY, count(Int64(1)) AS
ORDER_COUNT\
- \n Sort: FILENAME_PLACEHOLDER_0.o_orderpriority ASC NULLS LAST\
- \n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_0.o_orderpriority]],
aggr=[[count(Int64(1))]]\
- \n Projection: FILENAME_PLACEHOLDER_0.o_orderpriority\
- \n Filter: FILENAME_PLACEHOLDER_0.o_orderdate >=
CAST(Utf8(\"1993-07-01\") AS Date32) AND FILENAME_PLACEHOLDER_0.o_orderdate <
CAST(Utf8(\"1993-10-01\") AS Date32) AND EXISTS (<subquery>)\
- \n Subquery:\
- \n Filter: FILENAME_PLACEHOLDER_1.l_orderkey =
FILENAME_PLACEHOLDER_1.l_orderkey AND FILENAME_PLACEHOLDER_1.l_commitdate <
FILENAME_PLACEHOLDER_1.l_receiptdate\
- \n TableScan: FILENAME_PLACEHOLDER_1
projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity,
l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate,
l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_0 projection=[o_orderkey,
o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk,
o_shippriority, o_comment]");
+ async fn tpch_test_05() -> Result<()> {
+ let plan_str = tpch_plan_to_string(5).await?;
+ assert_eq!(
+ plan_str,
+ "Projection: NATION.N_NAME, sum(LINEITEM.L_EXTENDEDPRICE *
Int32(1) - LINEITEM.L_DISCOUNT) AS REVENUE\
+ \n Sort: sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT) DESC NULLS FIRST\
+ \n Aggregate: groupBy=[[NATION.N_NAME]],
aggr=[[sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT)]]\
+ \n Projection: NATION.N_NAME, LINEITEM.L_EXTENDEDPRICE *
(CAST(Int32(1) AS Decimal128(15, 2)) - LINEITEM.L_DISCOUNT)\
+ \n Filter: CUSTOMER.C_CUSTKEY = ORDERS.O_CUSTKEY AND
LINEITEM.L_ORDERKEY = ORDERS.O_ORDERKEY AND LINEITEM.L_SUPPKEY =
SUPPLIER.S_SUPPKEY AND CUSTOMER.C_NATIONKEY = SUPPLIER.S_NATIONKEY AND
SUPPLIER.S_NATIONKEY = NATION.N_NATIONKEY AND NATION.N_REGIONKEY =
REGION.R_REGIONKEY AND REGION.R_NAME = Utf8(\"ASIA\") AND ORDERS.O_ORDERDATE >=
CAST(Utf8(\"1994-01-01\") AS Date32) AND ORDERS.O_ORDERDATE <
CAST(Utf8(\"1995-01-01\") AS Date32)\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n TableScan: CUSTOMER projection=[C_CUSTKEY,
C_NAME, C_ADDRESS, C_NATIONKEY, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT]\
+ \n TableScan: ORDERS projection=[O_ORDERKEY,
O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK,
O_SHIPPRIORITY, O_COMMENT]\
+ \n TableScan: LINEITEM projection=[L_ORDERKEY,
L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT,
L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE,
L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT]\
+ \n TableScan: SUPPLIER projection=[S_SUPPKEY,
S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT]\
+ \n TableScan: NATION projection=[N_NATIONKEY, N_NAME,
N_REGIONKEY, N_COMMENT]\
+ \n TableScan: REGION projection=[R_REGIONKEY, R_NAME,
R_COMMENT]"
+ );
Ok(())
}
#[tokio::test]
- async fn tpch_test_5() -> Result<()> {
- let ctx = create_context(vec![
- ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/customer.csv"),
- ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/orders.csv"),
- ("FILENAME_PLACEHOLDER_2", "tests/testdata/tpch/lineitem.csv"),
- ("FILENAME_PLACEHOLDER_3", "tests/testdata/tpch/supplier.csv"),
- ("NATION", "tests/testdata/tpch/nation.csv"),
- ("REGION", "tests/testdata/tpch/region.csv"),
- ])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_5.json";
- let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
- File::open(path).expect("file not found"),
- ))
- .expect("failed to parse json");
+ async fn tpch_test_06() -> Result<()> {
+ let plan_str = tpch_plan_to_string(6).await?;
+ assert_eq!(
+ plan_str,
+ "Aggregate: groupBy=[[]], aggr=[[sum(LINEITEM.L_EXTENDEDPRICE *
LINEITEM.L_DISCOUNT) AS REVENUE]]\
+ \n Projection: LINEITEM.L_EXTENDEDPRICE * LINEITEM.L_DISCOUNT\
+ \n Filter: LINEITEM.L_SHIPDATE >= CAST(Utf8(\"1994-01-01\") AS
Date32) AND LINEITEM.L_SHIPDATE < CAST(Utf8(\"1995-01-01\") AS Date32) AND
LINEITEM.L_DISCOUNT >= Decimal128(Some(5),3,2) AND LINEITEM.L_DISCOUNT <=
Decimal128(Some(7),3,2) AND LINEITEM.L_QUANTITY < CAST(Int32(24) AS
Decimal128(15, 2))\
+ \n TableScan: LINEITEM projection=[L_ORDERKEY, L_PARTKEY,
L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX,
L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE,
L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT]"
+ );
+ Ok(())
+ }
- let plan = from_substrait_plan(&ctx, &proto).await?;
- let plan_str = format!("{}", plan);
- assert_eq!(plan_str, "Projection: NATION.N_NAME,
sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount) AS REVENUE\
- \n Sort: sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount) DESC NULLS FIRST\
- \n Aggregate: groupBy=[[NATION.N_NAME]],
aggr=[[sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount)]]\
- \n Projection: NATION.N_NAME,
FILENAME_PLACEHOLDER_2.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) -
FILENAME_PLACEHOLDER_2.l_discount)\
- \n Filter: FILENAME_PLACEHOLDER_0.c_custkey =
FILENAME_PLACEHOLDER_1.o_custkey AND FILENAME_PLACEHOLDER_2.l_orderkey =
FILENAME_PLACEHOLDER_1.o_orderkey AND FILENAME_PLACEHOLDER_2.l_suppkey =
FILENAME_PLACEHOLDER_3.s_suppkey AND FILENAME_PLACEHOLDER_0.c_nationkey =
FILENAME_PLACEHOLDER_3.s_nationkey AND FILENAME_PLACEHOLDER_3.s_nationkey =
NATION.N_NATIONKEY AND NATION.N_REGIONKEY = REGION.R_REGIONKEY AND
REGION.R_NAME = CAST(Utf8(\"ASIA\") AS Utf8) AND
FILENAME_PLACEHOLDER_1.o_orderdate >= CAST(Utf8(\"1994-01-01\") AS Date32) AND
FILENAME_PLACEHOLDER_1.o_orderdate < CAST(Utf8(\"1995-01-01\") AS Date32)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n TableScan: FILENAME_PLACEHOLDER_0
projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal,
c_mktsegment, c_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_1
projection=[o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate,
o_orderpriority, o_clerk, o_shippriority, o_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_2
projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity,
l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate,
l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_3
projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal,
s_comment]\
- \n TableScan: NATION projection=[N_NATIONKEY, N_NAME,
N_REGIONKEY, N_COMMENT]\
- \n TableScan: REGION projection=[R_REGIONKEY, R_NAME,
R_COMMENT]");
+ #[ignore]
+ #[tokio::test]
+ async fn tpch_test_07() -> Result<()> {
+ let plan_str = tpch_plan_to_string(7).await?;
+ assert_eq!(plan_str, "Missing support for enum function arguments");
Ok(())
}
+ #[ignore]
#[tokio::test]
- async fn tpch_test_6() -> Result<()> {
- let ctx = create_context(vec![(
- "FILENAME_PLACEHOLDER_0",
- "tests/testdata/tpch/lineitem.csv",
- )])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_6.json";
- let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
- File::open(path).expect("file not found"),
- ))
- .expect("failed to parse json");
+ async fn tpch_test_08() -> Result<()> {
+ let plan_str = tpch_plan_to_string(8).await?;
+ assert_eq!(plan_str, "Missing support for enum function arguments");
+ Ok(())
+ }
- let plan = from_substrait_plan(&ctx, &proto).await?;
- let plan_str = format!("{}", plan);
- assert_eq!(plan_str, "Aggregate: groupBy=[[]],
aggr=[[sum(FILENAME_PLACEHOLDER_0.l_extendedprice *
FILENAME_PLACEHOLDER_0.l_discount) AS REVENUE]]\
- \n Projection: FILENAME_PLACEHOLDER_0.l_extendedprice *
FILENAME_PLACEHOLDER_0.l_discount\
- \n Filter: FILENAME_PLACEHOLDER_0.l_shipdate >=
CAST(Utf8(\"1994-01-01\") AS Date32) AND FILENAME_PLACEHOLDER_0.l_shipdate <
CAST(Utf8(\"1995-01-01\") AS Date32) AND FILENAME_PLACEHOLDER_0.l_discount >=
Decimal128(Some(5),3,2) AND FILENAME_PLACEHOLDER_0.l_discount <=
Decimal128(Some(7),3,2) AND FILENAME_PLACEHOLDER_0.l_quantity < CAST(Int32(24)
AS Decimal128(19, 0))\
- \n TableScan: FILENAME_PLACEHOLDER_0 projection=[l_orderkey,
l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount,
l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate,
l_shipinstruct, l_shipmode, l_comment]");
+ #[ignore]
+ #[tokio::test]
+ async fn tpch_test_09() -> Result<()> {
+ let plan_str = tpch_plan_to_string(9).await?;
+ assert_eq!(plan_str, "Missing support for enum function arguments");
Ok(())
}
- // TODO: missing plan 7, 8, 9
#[tokio::test]
async fn tpch_test_10() -> Result<()> {
- let ctx = create_context(vec![
- ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/customer.csv"),
- ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/orders.csv"),
- ("FILENAME_PLACEHOLDER_2", "tests/testdata/tpch/lineitem.csv"),
- ("FILENAME_PLACEHOLDER_3", "tests/testdata/tpch/nation.csv"),
- ])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_10.json";
- let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
- File::open(path).expect("file not found"),
- ))
- .expect("failed to parse json");
-
- let plan = from_substrait_plan(&ctx, &proto).await?;
- let plan_str = format!("{}", plan);
- assert_eq!(plan_str, "Projection: FILENAME_PLACEHOLDER_0.c_custkey AS
C_CUSTKEY, FILENAME_PLACEHOLDER_0.c_name AS C_NAME,
sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount) AS REVENUE, FILENAME_PLACEHOLDER_0.c_acctbal
AS C_ACCTBAL, FILENAME_PLACEHOLDER_3.N_NAME, FILENAME_PLACEHOLDER_0.c_address
AS C_ADDRESS, FILENAME_PLACEHOLDER_0.c_phone AS C_PHONE,
FILENAME_PLACEHOLDER_0.c_comment AS C_COMMENT\
- \n Limit: skip=0, fetch=20\
- \n Sort: sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount) DESC NULLS FIRST\
- \n Projection: FILENAME_PLACEHOLDER_0.c_custkey,
FILENAME_PLACEHOLDER_0.c_name, sum(FILENAME_PLACEHOLDER_2.l_extendedprice *
Int32(1) - FILENAME_PLACEHOLDER_2.l_discount),
FILENAME_PLACEHOLDER_0.c_acctbal, FILENAME_PLACEHOLDER_3.N_NAME,
FILENAME_PLACEHOLDER_0.c_address, FILENAME_PLACEHOLDER_0.c_phone,
FILENAME_PLACEHOLDER_0.c_comment\n Aggregate:
groupBy=[[FILENAME_PLACEHOLDER_0.c_custkey, FILENAME_PLACEHOLDER_0.c_name,
FILENAME_PLACEHOLDER_0.c_acctbal, FILENAME_PLACEHOLDER_0.c_phone,
FILENAME_PLACEHOLDER_3.N_NAME, FILENAME_PLACEHOLDER_0.c_address,
FILENAME_PLACEHOLDER_0.c_comment]],
aggr=[[sum(FILENAME_PLACEHOLDER_2.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_2.l_discount)]]\
- \n Projection: FILENAME_PLACEHOLDER_0.c_custkey,
FILENAME_PLACEHOLDER_0.c_name, FILENAME_PLACEHOLDER_0.c_acctbal,
FILENAME_PLACEHOLDER_0.c_phone, FILENAME_PLACEHOLDER_3.N_NAME,
FILENAME_PLACEHOLDER_0.c_address, FILENAME_PLACEHOLDER_0.c_comment,
FILENAME_PLACEHOLDER_2.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) -
FILENAME_PLACEHOLDER_2.l_discount)\
- \n Filter: FILENAME_PLACEHOLDER_0.c_custkey =
FILENAME_PLACEHOLDER_1.o_custkey AND FILENAME_PLACEHOLDER_2.l_orderkey =
FILENAME_PLACEHOLDER_1.o_orderkey AND FILENAME_PLACEHOLDER_1.o_orderdate >=
CAST(Utf8(\"1993-10-01\") AS Date32) AND FILENAME_PLACEHOLDER_1.o_orderdate <
CAST(Utf8(\"1994-01-01\") AS Date32) AND FILENAME_PLACEHOLDER_2.l_returnflag =
Utf8(\"R\") AND FILENAME_PLACEHOLDER_0.c_nationkey =
FILENAME_PLACEHOLDER_3.N_NATIONKEY\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n TableScan: FILENAME_PLACEHOLDER_0
projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal,
c_mktsegment, c_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_1
projection=[o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate,
o_orderpriority, o_clerk, o_shippriority, o_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_2
projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity,
l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate,
l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_3
projection=[N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT]");
+ let plan_str = tpch_plan_to_string(10).await?;
+ assert_eq!(
+ plan_str,
+ "Projection: CUSTOMER.C_CUSTKEY, CUSTOMER.C_NAME,
sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT) AS REVENUE,
CUSTOMER.C_ACCTBAL, NATION.N_NAME, CUSTOMER.C_ADDRESS, CUSTOMER.C_PHONE,
CUSTOMER.C_COMMENT\
+ \n Limit: skip=0, fetch=20\
+ \n Sort: sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT) DESC NULLS FIRST\
+ \n Projection: CUSTOMER.C_CUSTKEY, CUSTOMER.C_NAME,
sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT),
CUSTOMER.C_ACCTBAL, NATION.N_NAME, CUSTOMER.C_ADDRESS, CUSTOMER.C_PHONE,
CUSTOMER.C_COMMENT\
+ \n Aggregate: groupBy=[[CUSTOMER.C_CUSTKEY,
CUSTOMER.C_NAME, CUSTOMER.C_ACCTBAL, CUSTOMER.C_PHONE, NATION.N_NAME,
CUSTOMER.C_ADDRESS, CUSTOMER.C_COMMENT]], aggr=[[sum(LINEITEM.L_EXTENDEDPRICE *
Int32(1) - LINEITEM.L_DISCOUNT)]]\
+ \n Projection: CUSTOMER.C_CUSTKEY, CUSTOMER.C_NAME,
CUSTOMER.C_ACCTBAL, CUSTOMER.C_PHONE, NATION.N_NAME, CUSTOMER.C_ADDRESS,
CUSTOMER.C_COMMENT, LINEITEM.L_EXTENDEDPRICE * (CAST(Int32(1) AS Decimal128(15,
2)) - LINEITEM.L_DISCOUNT)\
+ \n Filter: CUSTOMER.C_CUSTKEY = ORDERS.O_CUSTKEY AND
LINEITEM.L_ORDERKEY = ORDERS.O_ORDERKEY AND ORDERS.O_ORDERDATE >=
CAST(Utf8(\"1993-10-01\") AS Date32) AND ORDERS.O_ORDERDATE <
CAST(Utf8(\"1994-01-01\") AS Date32) AND LINEITEM.L_RETURNFLAG = Utf8(\"R\")
AND CUSTOMER.C_NATIONKEY = NATION.N_NATIONKEY\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n TableScan: CUSTOMER projection=[C_CUSTKEY,
C_NAME, C_ADDRESS, C_NATIONKEY, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT]\
+ \n TableScan: ORDERS projection=[O_ORDERKEY,
O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK,
O_SHIPPRIORITY, O_COMMENT]\
+ \n TableScan: LINEITEM projection=[L_ORDERKEY,
L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT,
L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE,
L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT]\
+ \n TableScan: NATION projection=[N_NATIONKEY,
N_NAME, N_REGIONKEY, N_COMMENT]"
+ );
Ok(())
}
#[tokio::test]
async fn tpch_test_11() -> Result<()> {
- let ctx = create_context(vec![
- ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/partsupp.csv"),
- ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/supplier.csv"),
- ("FILENAME_PLACEHOLDER_2", "tests/testdata/tpch/nation.csv"),
- ("FILENAME_PLACEHOLDER_3", "tests/testdata/tpch/partsupp.csv"),
- ("FILENAME_PLACEHOLDER_4", "tests/testdata/tpch/supplier.csv"),
- ("FILENAME_PLACEHOLDER_5", "tests/testdata/tpch/nation.csv"),
- ])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_11.json";
- let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
- File::open(path).expect("file not found"),
- ))
- .expect("failed to parse json");
+ let plan_str = tpch_plan_to_string(11).await?;
+ assert_eq!(
+ plan_str,
+ "Projection: PARTSUPP.PS_PARTKEY, sum(PARTSUPP.PS_SUPPLYCOST *
PARTSUPP.PS_AVAILQTY) AS value\
+ \n Sort: sum(PARTSUPP.PS_SUPPLYCOST * PARTSUPP.PS_AVAILQTY) DESC
NULLS FIRST\
+ \n Filter: sum(PARTSUPP.PS_SUPPLYCOST * PARTSUPP.PS_AVAILQTY) >
(<subquery>)\
+ \n Subquery:\
+ \n Projection: sum(PARTSUPP.PS_SUPPLYCOST *
PARTSUPP.PS_AVAILQTY) * Decimal128(Some(1000000),11,10)\
+ \n Aggregate: groupBy=[[]],
aggr=[[sum(PARTSUPP.PS_SUPPLYCOST * PARTSUPP.PS_AVAILQTY)]]\
+ \n Projection: PARTSUPP.PS_SUPPLYCOST *
CAST(PARTSUPP.PS_AVAILQTY AS Decimal128(19, 0))\
+ \n Filter: PARTSUPP.PS_SUPPKEY = SUPPLIER.S_SUPPKEY
AND SUPPLIER.S_NATIONKEY = NATION.N_NATIONKEY AND NATION.N_NAME =
Utf8(\"JAPAN\")\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n TableScan: PARTSUPP projection=[PS_PARTKEY,
PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST, PS_COMMENT]\
+ \n TableScan: SUPPLIER projection=[S_SUPPKEY,
S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT]\
+ \n TableScan: NATION projection=[N_NATIONKEY,
N_NAME, N_REGIONKEY, N_COMMENT]\
+ \n Aggregate: groupBy=[[PARTSUPP.PS_PARTKEY]],
aggr=[[sum(PARTSUPP.PS_SUPPLYCOST * PARTSUPP.PS_AVAILQTY)]]\
+ \n Projection: PARTSUPP.PS_PARTKEY, PARTSUPP.PS_SUPPLYCOST
* CAST(PARTSUPP.PS_AVAILQTY AS Decimal128(19, 0))\
+ \n Filter: PARTSUPP.PS_SUPPKEY = SUPPLIER.S_SUPPKEY AND
SUPPLIER.S_NATIONKEY = NATION.N_NATIONKEY AND NATION.N_NAME = Utf8(\"JAPAN\")\
+ \n CrossJoin:\
+ \n CrossJoin:\
+ \n TableScan: PARTSUPP projection=[PS_PARTKEY,
PS_SUPPKEY, PS_AVAILQTY, PS_SUPPLYCOST, PS_COMMENT]\
+ \n TableScan: SUPPLIER projection=[S_SUPPKEY,
S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE, S_ACCTBAL, S_COMMENT]\
+ \n TableScan: NATION projection=[N_NATIONKEY, N_NAME,
N_REGIONKEY, N_COMMENT]"
+ );
+ Ok(())
+ }
- let plan = from_substrait_plan(&ctx, &proto).await?;
- let plan_str = format!("{}", plan);
- assert_eq!(plan_str, "Projection: FILENAME_PLACEHOLDER_0.ps_partkey AS
PS_PARTKEY, sum(FILENAME_PLACEHOLDER_0.ps_supplycost *
FILENAME_PLACEHOLDER_0.ps_availqty) AS value\
- \n Sort: sum(FILENAME_PLACEHOLDER_0.ps_supplycost *
FILENAME_PLACEHOLDER_0.ps_availqty) DESC NULLS FIRST\
- \n Filter: sum(FILENAME_PLACEHOLDER_0.ps_supplycost *
FILENAME_PLACEHOLDER_0.ps_availqty) > (<subquery>)\
- \n Subquery:\
- \n Projection: sum(FILENAME_PLACEHOLDER_3.ps_supplycost *
FILENAME_PLACEHOLDER_3.ps_availqty) * Decimal128(Some(1000000),11,10)\
- \n Aggregate: groupBy=[[]],
aggr=[[sum(FILENAME_PLACEHOLDER_3.ps_supplycost *
FILENAME_PLACEHOLDER_3.ps_availqty)]]\
- \n Projection: FILENAME_PLACEHOLDER_3.ps_supplycost *
CAST(FILENAME_PLACEHOLDER_3.ps_availqty AS Decimal128(19, 0))\
- \n Filter: FILENAME_PLACEHOLDER_3.ps_suppkey =
FILENAME_PLACEHOLDER_4.s_suppkey AND FILENAME_PLACEHOLDER_4.s_nationkey =
FILENAME_PLACEHOLDER_5.N_NATIONKEY AND FILENAME_PLACEHOLDER_5.N_NAME =
CAST(Utf8(\"JAPAN\") AS Utf8)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n TableScan: FILENAME_PLACEHOLDER_3
projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_4
projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal,
s_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_5
projection=[N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT]\
- \n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_0.ps_partkey]],
aggr=[[sum(FILENAME_PLACEHOLDER_0.ps_supplycost *
FILENAME_PLACEHOLDER_0.ps_availqty)]]\
- \n Projection: FILENAME_PLACEHOLDER_0.ps_partkey,
FILENAME_PLACEHOLDER_0.ps_supplycost * CAST(FILENAME_PLACEHOLDER_0.ps_availqty
AS Decimal128(19, 0))\
- \n Filter: FILENAME_PLACEHOLDER_0.ps_suppkey =
FILENAME_PLACEHOLDER_1.s_suppkey AND FILENAME_PLACEHOLDER_1.s_nationkey =
FILENAME_PLACEHOLDER_2.N_NATIONKEY AND FILENAME_PLACEHOLDER_2.N_NAME =
CAST(Utf8(\"JAPAN\") AS Utf8)\
- \n Inner Join: Filter: Boolean(true)\
- \n Inner Join: Filter: Boolean(true)\
- \n TableScan: FILENAME_PLACEHOLDER_0
projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost, ps_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_1
projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal,
s_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_2
projection=[N_NATIONKEY, N_NAME, N_REGIONKEY, N_COMMENT]");
+ #[tokio::test]
+ async fn tpch_test_12() -> Result<()> {
+ let plan_str = tpch_plan_to_string(12).await?;
+ assert_eq!(
+ plan_str,
+ "Projection: LINEITEM.L_SHIPMODE, sum(CASE WHEN
ORDERS.O_ORDERPRIORITY = Utf8(\"1-URGENT\") OR ORDERS.O_ORDERPRIORITY =
Utf8(\"2-HIGH\") THEN Int32(1) ELSE Int32(0) END) AS HIGH_LINE_COUNT, sum(CASE
WHEN ORDERS.O_ORDERPRIORITY != Utf8(\"1-URGENT\") AND ORDERS.O_ORDERPRIORITY !=
Utf8(\"2-HIGH\") THEN Int32(1) ELSE Int32(0) END) AS LOW_LINE_COUNT\
+ \n Sort: LINEITEM.L_SHIPMODE ASC NULLS LAST\
+ \n Aggregate: groupBy=[[LINEITEM.L_SHIPMODE]], aggr=[[sum(CASE
WHEN ORDERS.O_ORDERPRIORITY = Utf8(\"1-URGENT\") OR ORDERS.O_ORDERPRIORITY =
Utf8(\"2-HIGH\") THEN Int32(1) ELSE Int32(0) END), sum(CASE WHEN
ORDERS.O_ORDERPRIORITY != Utf8(\"1-URGENT\") AND ORDERS.O_ORDERPRIORITY !=
Utf8(\"2-HIGH\") THEN Int32(1) ELSE Int32(0) END)]]\
+ \n Projection: LINEITEM.L_SHIPMODE, CASE WHEN
ORDERS.O_ORDERPRIORITY = Utf8(\"1-URGENT\") OR ORDERS.O_ORDERPRIORITY =
Utf8(\"2-HIGH\") THEN Int32(1) ELSE Int32(0) END, CASE WHEN
ORDERS.O_ORDERPRIORITY != Utf8(\"1-URGENT\") AND ORDERS.O_ORDERPRIORITY !=
Utf8(\"2-HIGH\") THEN Int32(1) ELSE Int32(0) END\
+ \n Filter: ORDERS.O_ORDERKEY = LINEITEM.L_ORDERKEY AND
(LINEITEM.L_SHIPMODE = CAST(Utf8(\"MAIL\") AS Utf8) OR LINEITEM.L_SHIPMODE =
CAST(Utf8(\"SHIP\") AS Utf8)) AND LINEITEM.L_COMMITDATE <
LINEITEM.L_RECEIPTDATE AND LINEITEM.L_SHIPDATE < LINEITEM.L_COMMITDATE AND
LINEITEM.L_RECEIPTDATE >= CAST(Utf8(\"1994-01-01\") AS Date32) AND
LINEITEM.L_RECEIPTDATE < CAST(Utf8(\"1995-01-01\") AS Date32)\
+ \n CrossJoin:\
+ \n TableScan: ORDERS projection=[O_ORDERKEY, O_CUSTKEY,
O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK,
O_SHIPPRIORITY, O_COMMENT]\
+ \n TableScan: LINEITEM projection=[L_ORDERKEY,
L_PARTKEY, L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT,
L_TAX, L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE,
L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT]"
+ );
Ok(())
}
- // missing query 12
#[tokio::test]
async fn tpch_test_13() -> Result<()> {
- let ctx = create_context(vec![
- ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/customer.csv"),
- ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/orders.csv"),
- ])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_13.json";
- let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
- File::open(path).expect("file not found"),
- ))
- .expect("failed to parse json");
-
- let plan = from_substrait_plan(&ctx, &proto).await?;
- let plan_str = format!("{}", plan);
- assert_eq!(plan_str, "Projection:
count(FILENAME_PLACEHOLDER_1.o_orderkey) AS C_COUNT, count(Int64(1)) AS
CUSTDIST\
- \n Sort: count(Int64(1)) DESC NULLS FIRST,
count(FILENAME_PLACEHOLDER_1.o_orderkey) DESC NULLS FIRST\
- \n Projection: count(FILENAME_PLACEHOLDER_1.o_orderkey),
count(Int64(1))\
- \n Aggregate:
groupBy=[[count(FILENAME_PLACEHOLDER_1.o_orderkey)]], aggr=[[count(Int64(1))]]\
- \n Projection: count(FILENAME_PLACEHOLDER_1.o_orderkey)\
- \n Aggregate: groupBy=[[FILENAME_PLACEHOLDER_0.c_custkey]],
aggr=[[count(FILENAME_PLACEHOLDER_1.o_orderkey)]]\
- \n Projection: FILENAME_PLACEHOLDER_0.c_custkey,
FILENAME_PLACEHOLDER_1.o_orderkey\
- \n Left Join: FILENAME_PLACEHOLDER_0.c_custkey =
FILENAME_PLACEHOLDER_1.o_custkey Filter: NOT FILENAME_PLACEHOLDER_1.o_comment
LIKE CAST(Utf8(\"%special%requests%\") AS Utf8)\
- \n TableScan: FILENAME_PLACEHOLDER_0
projection=[c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal,
c_mktsegment, c_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_1
projection=[o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate,
o_orderpriority, o_clerk, o_shippriority, o_comment]");
+ let plan_str = tpch_plan_to_string(13).await?;
+ assert_eq!(
+ plan_str,
+ "Projection: count(ORDERS.O_ORDERKEY) AS C_COUNT, count(Int64(1))
AS CUSTDIST\
+ \n Sort: count(Int64(1)) DESC NULLS FIRST,
count(ORDERS.O_ORDERKEY) DESC NULLS FIRST\
+ \n Projection: count(ORDERS.O_ORDERKEY), count(Int64(1))\
+ \n Aggregate: groupBy=[[count(ORDERS.O_ORDERKEY)]],
aggr=[[count(Int64(1))]]\
+ \n Projection: count(ORDERS.O_ORDERKEY)\
+ \n Aggregate: groupBy=[[CUSTOMER.C_CUSTKEY]],
aggr=[[count(ORDERS.O_ORDERKEY)]]\
+ \n Projection: CUSTOMER.C_CUSTKEY, ORDERS.O_ORDERKEY\
+ \n Left Join: CUSTOMER.C_CUSTKEY = ORDERS.O_CUSTKEY
Filter: NOT ORDERS.O_COMMENT LIKE CAST(Utf8(\"%special%requests%\") AS Utf8)\
+ \n TableScan: CUSTOMER projection=[C_CUSTKEY,
C_NAME, C_ADDRESS, C_NATIONKEY, C_PHONE, C_ACCTBAL, C_MKTSEGMENT, C_COMMENT]\
+ \n TableScan: ORDERS projection=[O_ORDERKEY,
O_CUSTKEY, O_ORDERSTATUS, O_TOTALPRICE, O_ORDERDATE, O_ORDERPRIORITY, O_CLERK,
O_SHIPPRIORITY, O_COMMENT]"
+ );
Ok(())
}
#[tokio::test]
async fn tpch_test_14() -> Result<()> {
- let ctx = create_context(vec![
- ("FILENAME_PLACEHOLDER_0", "tests/testdata/tpch/lineitem.csv"),
- ("FILENAME_PLACEHOLDER_1", "tests/testdata/tpch/part.csv"),
- ])
- .await?;
- let path = "tests/testdata/tpch_substrait_plans/query_14.json";
- let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
- File::open(path).expect("file not found"),
- ))
- .expect("failed to parse json");
+ let plan_str = tpch_plan_to_string(14).await?;
+ assert_eq!(
+ plan_str,
+ "Projection: Decimal128(Some(10000),5,2) * sum(CASE WHEN
PART.P_TYPE LIKE Utf8(\"PROMO%\") THEN LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT ELSE Decimal128(Some(0),19,4) END) /
sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT) AS PROMO_REVENUE\
+ \n Aggregate: groupBy=[[]], aggr=[[sum(CASE WHEN PART.P_TYPE LIKE
Utf8(\"PROMO%\") THEN LINEITEM.L_EXTENDEDPRICE * Int32(1) - LINEITEM.L_DISCOUNT
ELSE Decimal128(Some(0),19,4) END), sum(LINEITEM.L_EXTENDEDPRICE * Int32(1) -
LINEITEM.L_DISCOUNT)]]\
+ \n Projection: CASE WHEN PART.P_TYPE LIKE CAST(Utf8(\"PROMO%\")
AS Utf8) THEN LINEITEM.L_EXTENDEDPRICE * (CAST(Int32(1) AS Decimal128(15, 2)) -
LINEITEM.L_DISCOUNT) ELSE Decimal128(Some(0),19,4) END,
LINEITEM.L_EXTENDEDPRICE * (CAST(Int32(1) AS Decimal128(15, 2)) -
LINEITEM.L_DISCOUNT)\
+ \n Filter: LINEITEM.L_PARTKEY = PART.P_PARTKEY AND
LINEITEM.L_SHIPDATE >= Date32(\"1995-09-01\") AND LINEITEM.L_SHIPDATE <
CAST(Utf8(\"1995-10-01\") AS Date32)\
+ \n CrossJoin:\
+ \n TableScan: LINEITEM projection=[L_ORDERKEY, L_PARTKEY,
L_SUPPKEY, L_LINENUMBER, L_QUANTITY, L_EXTENDEDPRICE, L_DISCOUNT, L_TAX,
L_RETURNFLAG, L_LINESTATUS, L_SHIPDATE, L_COMMITDATE, L_RECEIPTDATE,
L_SHIPINSTRUCT, L_SHIPMODE, L_COMMENT]\
+ \n TableScan: PART projection=[P_PARTKEY, P_NAME, P_MFGR,
P_BRAND, P_TYPE, P_SIZE, P_CONTAINER, P_RETAILPRICE, P_COMMENT]"
+ );
+ Ok(())
+ }
- let plan = from_substrait_plan(&ctx, &proto).await?;
- let plan_str = format!("{}", plan);
- assert_eq!(plan_str, "Projection: Decimal128(Some(10000),5,2) *
sum(CASE WHEN FILENAME_PLACEHOLDER_1.p_type LIKE Utf8(\"PROMO%\") THEN
FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_0.l_discount ELSE Decimal128(Some(0),19,0) END) /
sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_0.l_discount) AS PROMO_REVENUE\
- \n Aggregate: groupBy=[[]], aggr=[[sum(CASE WHEN
FILENAME_PLACEHOLDER_1.p_type LIKE Utf8(\"PROMO%\") THEN
FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_0.l_discount ELSE Decimal128(Some(0),19,0) END),
sum(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) -
FILENAME_PLACEHOLDER_0.l_discount)]]\
- \n Projection: CASE WHEN FILENAME_PLACEHOLDER_1.p_type LIKE
CAST(Utf8(\"PROMO%\") AS Utf8) THEN FILENAME_PLACEHOLDER_0.l_extendedprice *
(CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_0.l_discount) ELSE
Decimal128(Some(0),19,0) END, FILENAME_PLACEHOLDER_0.l_extendedprice *
(CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_0.l_discount)\
- \n Filter: FILENAME_PLACEHOLDER_0.l_partkey =
FILENAME_PLACEHOLDER_1.p_partkey AND FILENAME_PLACEHOLDER_0.l_shipdate >=
Date32(\"1995-09-01\") AND FILENAME_PLACEHOLDER_0.l_shipdate <
CAST(Utf8(\"1995-10-01\") AS Date32)\
- \n Inner Join: Filter: Boolean(true)\
- \n TableScan: FILENAME_PLACEHOLDER_0 projection=[l_orderkey,
l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount,
l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate,
l_shipinstruct, l_shipmode, l_comment]\
- \n TableScan: FILENAME_PLACEHOLDER_1 projection=[p_partkey,
p_name, p_mfgr, p_brand, p_type, p_size, p_container, p_retailprice,
p_comment]");
+ #[ignore]
+ #[tokio::test]
Review Comment:
I wonder if we should be tracking in a ticket somewhere which TPCH plans
aren't currently handled by the substrait implementation 🤔
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]