Blizzara commented on code in PR #10842: URL: https://github.com/apache/datafusion/pull/10842#discussion_r1633462404
########## datafusion/substrait/src/logical_plan/consumer.rs: ########## @@ -22,6 +22,9 @@ use datafusion::arrow::datatypes::{ use datafusion::common::{ not_impl_err, substrait_datafusion_err, substrait_err, DFSchema, DFSchemaRef, }; +use substrait::proto::expression::literal::IntervalDayToSecond; Review Comment: nit: can we combine/move these into the other substrait imports below? ########## datafusion/substrait/src/logical_plan/consumer.rs: ########## @@ -569,7 +571,80 @@ pub async fn from_substrait_rel( Ok(LogicalPlan::Values(Values { schema, values })) } - _ => not_impl_err!("Only NamedTable and VirtualTable reads are supported"), + Some(ReadType::LocalFiles(lf)) => { + fn extract_filename(name: &str) -> Option<String> { + let corrected_url = + if name.starts_with("file://") && !name.starts_with("file:///") { + name.replacen("file://", "file:///", 1) + } else { + name.to_string() + }; + + Url::parse(&corrected_url).ok().and_then(|url| { + let path = url.path(); + std::path::Path::new(path) + .file_name() + .map(|filename| filename.to_string_lossy().to_string()) + }) + } + + // we could use the file name to check the original table provider + // TODO: currently does not support multiple local files + let filename: Option<String> = + lf.items.first().and_then(|x| match x.path_type.as_ref() { + Some(UriFile(name)) => extract_filename(name), + _ => None, + }); + + if lf.items.len() > 1 || filename.is_none() { + return not_impl_err!( + "Only NamedTable and VirtualTable reads are supported" + ); + } + let name = filename.unwrap(); + // directly use unwrap here since we could determine it is a valid one + let table_reference = TableReference::Bare { table: name.into() }; + let t = ctx.table(table_reference).await?; + let t = t.into_optimized_plan()?; + match &read.projection { Review Comment: Is this logic same as for NamedTable? If so, maybe extract into a function / reuse in some way? ########## datafusion/substrait/src/logical_plan/consumer.rs: ########## @@ -810,14 +885,21 @@ pub async fn from_substrait_agg_func( f.function_reference ); }; - + let function_name = function_name.split(':').next().unwrap_or(function_name); Review Comment: I guess this is same idea as in https://github.com/apache/datafusion/blob/59120255916bcd624161ce8f5df255f2cc838406/datafusion/substrait/src/logical_plan/consumer.rs#L132 ? Might be worth consolidating those into some helper function as well ########## datafusion/substrait/src/logical_plan/consumer.rs: ########## @@ -569,7 +571,80 @@ pub async fn from_substrait_rel( Ok(LogicalPlan::Values(Values { schema, values })) } - _ => not_impl_err!("Only NamedTable and VirtualTable reads are supported"), + Some(ReadType::LocalFiles(lf)) => { + fn extract_filename(name: &str) -> Option<String> { + let corrected_url = + if name.starts_with("file://") && !name.starts_with("file:///") { + name.replacen("file://", "file:///", 1) + } else { + name.to_string() + }; + + Url::parse(&corrected_url).ok().and_then(|url| { + let path = url.path(); + std::path::Path::new(path) + .file_name() + .map(|filename| filename.to_string_lossy().to_string()) + }) + } + + // we could use the file name to check the original table provider + // TODO: currently does not support multiple local files + let filename: Option<String> = + lf.items.first().and_then(|x| match x.path_type.as_ref() { + Some(UriFile(name)) => extract_filename(name), + _ => None, + }); + + if lf.items.len() > 1 || filename.is_none() { + return not_impl_err!( + "Only NamedTable and VirtualTable reads are supported" + ); + } + let name = filename.unwrap(); + // directly use unwrap here since we could determine it is a valid one + let table_reference = TableReference::Bare { table: name.into() }; + let t = ctx.table(table_reference).await?; + let t = t.into_optimized_plan()?; + match &read.projection { + Some(MaskExpression { select, .. }) => match &select.as_ref() { + Some(projection) => { + let column_indices: Vec<usize> = projection + .struct_items + .iter() + .map(|item| item.field as usize) + .collect(); + match &t { + LogicalPlan::TableScan(scan) => { + let fields = column_indices + .iter() + .map(|i| { + scan.projected_schema.qualified_field(*i) + }) + .map(|(qualifier, field)| { + (qualifier.cloned(), Arc::new(field.clone())) + }) + .collect(); + let mut scan = scan.clone(); + scan.projection = Some(column_indices); + scan.projected_schema = + DFSchemaRef::new(DFSchema::new_with_metadata( + fields, + HashMap::new(), + )?); + Ok(LogicalPlan::TableScan(scan)) + } + _ => plan_err!("unexpected plan for table"), + } + } + _ => Ok(t), + }, + _ => Ok(t), + } + } + _ => { + not_impl_err!("Only NamedTable and VirtualTable reads are supported") + } Review Comment: ```suggestion _ => not_impl_err!("Unsupported ReadType: {:?}", &read.as_ref().read_type), ``` ########## datafusion/substrait/src/logical_plan/consumer.rs: ########## @@ -569,7 +571,80 @@ pub async fn from_substrait_rel( Ok(LogicalPlan::Values(Values { schema, values })) } - _ => not_impl_err!("Only NamedTable and VirtualTable reads are supported"), + Some(ReadType::LocalFiles(lf)) => { + fn extract_filename(name: &str) -> Option<String> { + let corrected_url = + if name.starts_with("file://") && !name.starts_with("file:///") { + name.replacen("file://", "file:///", 1) + } else { + name.to_string() + }; + + Url::parse(&corrected_url).ok().and_then(|url| { + let path = url.path(); + std::path::Path::new(path) + .file_name() + .map(|filename| filename.to_string_lossy().to_string()) + }) + } + + // we could use the file name to check the original table provider + // TODO: currently does not support multiple local files + let filename: Option<String> = + lf.items.first().and_then(|x| match x.path_type.as_ref() { + Some(UriFile(name)) => extract_filename(name), + _ => None, + }); + + if lf.items.len() > 1 || filename.is_none() { + return not_impl_err!( + "Only NamedTable and VirtualTable reads are supported" Review Comment: ```suggestion "Only single file reads are supported" ``` ########## datafusion/substrait/tests/cases/tpch.rs: ########## @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! tests contains in <https://github.com/substrait-io/consumer-testing/tree/main/substrait_consumer/tests/integration/queries/tpch_substrait_plans> + +#[cfg(test)] +mod tests { + use datafusion::common::Result; + use datafusion::execution::options::CsvReadOptions; + use datafusion::prelude::SessionContext; + use datafusion_substrait::logical_plan::consumer::from_substrait_plan; + use std::fs::File; + use std::io::BufReader; + use substrait::proto::Plan; + + #[tokio::test] + async fn tpch_test_1() -> Result<()> { + let ctx = create_context().await?; + let path = "tests/testdata/query_1.json"; + let proto = serde_json::from_reader::<_, Plan>(BufReader::new( + File::open(path).expect("file not found"), + )) + .expect("failed to parse json"); + + let plan = from_substrait_plan(&ctx, &proto).await?; + + assert!( + format!("{:?}", plan).eq_ignore_ascii_case( + "Sort: FILENAME_PLACEHOLDER_0.l_returnflag ASC NULLS LAST, FILENAME_PLACEHOLDER_0.l_linestatus ASC NULLS LAST\n \ + Aggregate: groupBy=[[FILENAME_PLACEHOLDER_0.l_returnflag, FILENAME_PLACEHOLDER_0.l_linestatus]], aggr=[[SUM(FILENAME_PLACEHOLDER_0.l_quantity), SUM(FILENAME_PLACEHOLDER_0.l_extendedprice), SUM(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_0.l_discount), SUM(FILENAME_PLACEHOLDER_0.l_extendedprice * Int32(1) - FILENAME_PLACEHOLDER_0.l_discount * Int32(1) + FILENAME_PLACEHOLDER_0.l_tax), AVG(FILENAME_PLACEHOLDER_0.l_quantity), AVG(FILENAME_PLACEHOLDER_0.l_extendedprice), AVG(FILENAME_PLACEHOLDER_0.l_discount), COUNT(Int64(1))]]\n \ + Projection: FILENAME_PLACEHOLDER_0.l_returnflag, FILENAME_PLACEHOLDER_0.l_linestatus, FILENAME_PLACEHOLDER_0.l_quantity, FILENAME_PLACEHOLDER_0.l_extendedprice, FILENAME_PLACEHOLDER_0.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_0.l_discount), FILENAME_PLACEHOLDER_0.l_extendedprice * (CAST(Int32(1) AS Decimal128(19, 0)) - FILENAME_PLACEHOLDER_0.l_discount) * (CAST(Int32(1) AS Decimal128(19, 0)) + FILENAME_PLACEHOLDER_0.l_tax), FILENAME_PLACEHOLDER_0.l_discount\n \ + Filter: FILENAME_PLACEHOLDER_0.l_shipdate <= Date32(\"1998-12-01\") - IntervalDayTime(\"IntervalDayTime { days: 120, milliseconds: 0 }\")\n \ + TableScan: FILENAME_PLACEHOLDER_0 projection=[l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment]" + ) + ); + Ok(()) + } + + async fn create_context() -> datafusion::common::Result<SessionContext> { + let ctx = SessionContext::new(); + ctx.register_csv( Review Comment: the substrait plan indicates the files would be parquet, I wonder if that'll cause trouble now or later given we use csv (and it makes sense to use CSV here, I think) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org