vbarua commented on code in PR #13803: URL: https://github.com/apache/datafusion/pull/13803#discussion_r1887852717
########## datafusion/substrait/src/logical_plan/consumer.rs: ########## @@ -557,570 +977,510 @@ fn make_renamed_schema( ) } -/// Convert Substrait Rel to DataFusion DataFrame -#[allow(deprecated)] #[async_recursion] -pub async fn from_substrait_rel( - state: &dyn SubstraitPlanningState, - rel: &Rel, - extensions: &Extensions, +pub async fn from_project_rel( + consumer: &impl SubstraitConsumer, + p: &ProjectRel, ) -> Result<LogicalPlan> { - let plan: Result<LogicalPlan> = match &rel.rel_type { - Some(RelType::Project(p)) => { - if let Some(input) = p.input.as_ref() { - let mut input = LogicalPlanBuilder::from( - from_substrait_rel(state, input, extensions).await?, - ); - let original_schema = input.schema().clone(); - - // Ensure that all expressions have a unique display name, so that - // validate_unique_names does not fail when constructing the project. - let mut name_tracker = NameTracker::new(); - - // By default, a Substrait Project emits all inputs fields followed by all expressions. - // We build the explicit expressions first, and then the input expressions to avoid - // adding aliases to the explicit expressions (as part of ensuring unique names). - // - // This is helpful for plan visualization and tests, because when DataFusion produces - // Substrait Projects it adds an output mapping that excludes all input columns - // leaving only explicit expressions. - - let mut explicit_exprs: Vec<Expr> = vec![]; - for expr in &p.expressions { - let e = from_substrait_rex( - state, - expr, - input.clone().schema(), - extensions, - ) - .await?; - // if the expression is WindowFunction, wrap in a Window relation - if let Expr::WindowFunction(_) = &e { - // Adding the same expression here and in the project below - // works because the project's builder uses columnize_expr(..) - // to transform it into a column reference - input = input.window(vec![e.clone()])? - } - explicit_exprs.push(name_tracker.get_uniquely_named_expr(e)?); - } - - let mut final_exprs: Vec<Expr> = vec![]; - for index in 0..original_schema.fields().len() { - let e = Expr::Column(Column::from( - original_schema.qualified_field(index), - )); - final_exprs.push(name_tracker.get_uniquely_named_expr(e)?); - } - final_exprs.append(&mut explicit_exprs); - - input.project(final_exprs)?.build() - } else { - not_impl_err!("Projection without an input is not supported") + if let Some(input) = p.input.as_ref() { + let mut input = + LogicalPlanBuilder::from(from_substrait_rel(consumer, input).await?); + let original_schema = input.schema().clone(); + + // Ensure that all expressions have a unique display name, so that + // validate_unique_names does not fail when constructing the project. + let mut name_tracker = NameTracker::new(); + + // By default, a Substrait Project emits all inputs fields followed by all expressions. + // We build the explicit expressions first, and then the input expressions to avoid + // adding aliases to the explicit expressions (as part of ensuring unique names). + // + // This is helpful for plan visualization and tests, because when DataFusion produces + // Substrait Projects it adds an output mapping that excludes all input columns + // leaving only explicit expressions. + + let mut explicit_exprs: Vec<Expr> = vec![]; + for expr in &p.expressions { + let e = from_substrait_rex(consumer, expr, input.clone().schema()).await?; + // if the expression is WindowFunction, wrap in a Window relation + if let Expr::WindowFunction(_) = &e { + // Adding the same expression here and in the project below + // works because the project's builder uses columnize_expr(..) + // to transform it into a column reference + input = input.window(vec![e.clone()])? } + explicit_exprs.push(name_tracker.get_uniquely_named_expr(e)?); } - Some(RelType::Filter(filter)) => { - if let Some(input) = filter.input.as_ref() { - let input = LogicalPlanBuilder::from( - from_substrait_rel(state, input, extensions).await?, - ); - if let Some(condition) = filter.condition.as_ref() { - let expr = - from_substrait_rex(state, condition, input.schema(), extensions) - .await?; - input.filter(expr)?.build() - } else { - not_impl_err!("Filter without an condition is not valid") - } - } else { - not_impl_err!("Filter without an input is not valid") - } + + let mut final_exprs: Vec<Expr> = vec![]; + for index in 0..original_schema.fields().len() { + let e = Expr::Column(Column::from(original_schema.qualified_field(index))); + final_exprs.push(name_tracker.get_uniquely_named_expr(e)?); } - Some(RelType::Fetch(fetch)) => { - if let Some(input) = fetch.input.as_ref() { - let input = LogicalPlanBuilder::from( - from_substrait_rel(state, input, extensions).await?, - ); - let offset = fetch.offset as usize; - // -1 means that ALL records should be returned - let count = if fetch.count == -1 { - None - } else { - Some(fetch.count as usize) - }; - input.limit(offset, count)?.build() - } else { - not_impl_err!("Fetch without an input is not valid") - } + final_exprs.append(&mut explicit_exprs); + input.project(final_exprs)?.build() + } else { + not_impl_err!("Projection without an input is not supported") + } +} + +#[async_recursion] +pub async fn from_filter_rel( + consumer: &impl SubstraitConsumer, + filter: &FilterRel, +) -> Result<LogicalPlan> { + if let Some(input) = filter.input.as_ref() { + let input = LogicalPlanBuilder::from(from_substrait_rel(consumer, input).await?); + if let Some(condition) = filter.condition.as_ref() { + let expr = from_substrait_rex(consumer, condition, input.schema()).await?; + input.filter(expr)?.build() + } else { + not_impl_err!("Filter without an condition is not valid") } - Some(RelType::Sort(sort)) => { - if let Some(input) = sort.input.as_ref() { - let input = LogicalPlanBuilder::from( - from_substrait_rel(state, input, extensions).await?, - ); - let sorts = - from_substrait_sorts(state, &sort.sorts, input.schema(), extensions) - .await?; - input.sort(sorts)?.build() - } else { - not_impl_err!("Sort without an input is not valid") - } + } else { + not_impl_err!("Filter without an input is not valid") + } +} + +#[async_recursion] +pub async fn from_fetch_rel( + consumer: &impl SubstraitConsumer, + fetch: &FetchRel, +) -> Result<LogicalPlan> { + if let Some(input) = fetch.input.as_ref() { + let input = LogicalPlanBuilder::from(from_substrait_rel(consumer, input).await?); + let offset = fetch.offset as usize; + // -1 means that ALL records should be returned + let count = if fetch.count == -1 { + None + } else { + Some(fetch.count as usize) + }; + input.limit(offset, count)?.build() + } else { + not_impl_err!("Fetch without an input is not valid") + } +} + +pub async fn from_sort_rel( + consumer: &impl SubstraitConsumer, + sort: &SortRel, +) -> Result<LogicalPlan> { + if let Some(input) = sort.input.as_ref() { + let input = LogicalPlanBuilder::from(from_substrait_rel(consumer, input).await?); + let sorts = from_substrait_sorts(consumer, &sort.sorts, input.schema()).await?; + input.sort(sorts)?.build() + } else { + not_impl_err!("Sort without an input is not valid") + } +} + +pub async fn from_aggregate_rel( + consumer: &impl SubstraitConsumer, + agg: &AggregateRel, +) -> Result<LogicalPlan> { + if let Some(input) = agg.input.as_ref() { + let input = LogicalPlanBuilder::from(from_substrait_rel(consumer, input).await?); + let mut ref_group_exprs = vec![]; + + for e in &agg.grouping_expressions { + let x = from_substrait_rex(consumer, e, input.schema()).await?; + ref_group_exprs.push(x); } - Some(RelType::Aggregate(agg)) => { - if let Some(input) = agg.input.as_ref() { - let input = LogicalPlanBuilder::from( - from_substrait_rel(state, input, extensions).await?, - ); - let mut ref_group_exprs = vec![]; - for e in &agg.grouping_expressions { - let x = - from_substrait_rex(state, e, input.schema(), extensions).await?; - ref_group_exprs.push(x); + let mut group_exprs = vec![]; + let mut aggr_exprs = vec![]; + + match agg.groupings.len() { + 1 => { + group_exprs.extend_from_slice( + &from_substrait_grouping( + consumer, + &agg.groupings[0], + &ref_group_exprs, + input.schema(), + ) + .await?, + ); + } + _ => { + let mut grouping_sets = vec![]; + for grouping in &agg.groupings { + let grouping_set = from_substrait_grouping( + consumer, + grouping, + &ref_group_exprs, + input.schema(), + ) + .await?; + grouping_sets.push(grouping_set); } + // Single-element grouping expression of type Expr::GroupingSet. + // Note that GroupingSet::Rollup would become GroupingSet::GroupingSets, when + // parsed by the producer and consumer, since Substrait does not have a type dedicated + // to ROLLUP. Only vector of Groupings (grouping sets) is available. + group_exprs + .push(Expr::GroupingSet(GroupingSet::GroupingSets(grouping_sets))); + } + }; - let mut group_exprs = vec![]; - let mut aggr_exprs = vec![]; - - match agg.groupings.len() { - 1 => { - group_exprs.extend_from_slice( - &from_substrait_grouping( - state, - &agg.groupings[0], - &ref_group_exprs, - input.schema(), - extensions, - ) - .await?, - ); - } - _ => { - let mut grouping_sets = vec![]; - for grouping in &agg.groupings { - let grouping_set = from_substrait_grouping( - state, - grouping, - &ref_group_exprs, - input.schema(), - extensions, - ) - .await?; - grouping_sets.push(grouping_set); + for m in &agg.measures { + let filter = match &m.filter { + Some(fil) => Some(Box::new( + from_substrait_rex(consumer, fil, input.schema()).await?, + )), + None => None, + }; + let agg_func = match &m.measure { + Some(f) => { + let distinct = match f.invocation { + _ if f.invocation == AggregationInvocation::Distinct as i32 => { + true } - // Single-element grouping expression of type Expr::GroupingSet. - // Note that GroupingSet::Rollup would become GroupingSet::GroupingSets, when - // parsed by the producer and consumer, since Substrait does not have a type dedicated - // to ROLLUP. Only vector of Groupings (grouping sets) is available. - group_exprs.push(Expr::GroupingSet(GroupingSet::GroupingSets( - grouping_sets, - ))); - } - }; - - for m in &agg.measures { - let filter = match &m.filter { - Some(fil) => Some(Box::new( - from_substrait_rex(state, fil, input.schema(), extensions) - .await?, - )), - None => None, + _ if f.invocation == AggregationInvocation::All as i32 => false, + _ => false, }; - let agg_func = match &m.measure { - Some(f) => { - let distinct = match f.invocation { - _ if f.invocation - == AggregationInvocation::Distinct as i32 => - { - true - } - _ if f.invocation - == AggregationInvocation::All as i32 => - { - false - } - _ => false, - }; - let order_by = if !f.sorts.is_empty() { - Some( - from_substrait_sorts( - state, - &f.sorts, - input.schema(), - extensions, - ) - .await?, - ) - } else { - None - }; - - from_substrait_agg_func( - state, - f, - input.schema(), - extensions, - filter, - order_by, - distinct, - ) - .await - } - None => not_impl_err!( - "Aggregate without aggregate function is not supported" - ), + let order_by = if !f.sorts.is_empty() { + Some( + from_substrait_sorts(consumer, &f.sorts, input.schema()) + .await?, + ) + } else { + None }; - aggr_exprs.push(agg_func?.as_ref().clone()); - } - input.aggregate(group_exprs, aggr_exprs)?.build() - } else { - not_impl_err!("Aggregate without an input is not valid") - } - } - Some(RelType::Join(join)) => { - if join.post_join_filter.is_some() { - return not_impl_err!( - "JoinRel with post_join_filter is not yet supported" - ); - } - let left: LogicalPlanBuilder = LogicalPlanBuilder::from( - from_substrait_rel(state, join.left.as_ref().unwrap(), extensions) - .await?, - ); - let right = LogicalPlanBuilder::from( - from_substrait_rel(state, join.right.as_ref().unwrap(), extensions) - .await?, - ); - let (left, right) = requalify_sides_if_needed(left, right)?; - - let join_type = from_substrait_jointype(join.r#type)?; - // The join condition expression needs full input schema and not the output schema from join since we lose columns from - // certain join types such as semi and anti joins - let in_join_schema = left.schema().join(right.schema())?; - - // If join expression exists, parse the `on` condition expression, build join and return - // Otherwise, build join with only the filter, without join keys - match &join.expression.as_ref() { - Some(expr) => { - let on = from_substrait_rex(state, expr, &in_join_schema, extensions) - .await?; - // The join expression can contain both equal and non-equal ops. - // As of datafusion 31.0.0, the equal and non equal join conditions are in separate fields. - // So we extract each part as follows: - // - If an Eq or IsNotDistinctFrom op is encountered, add the left column, right column and is_null_equal_nulls to `join_ons` vector - // - Otherwise we add the expression to join_filter (use conjunction if filter already exists) - let (join_ons, nulls_equal_nulls, join_filter) = - split_eq_and_noneq_join_predicate_with_nulls_equality(&on); - let (left_cols, right_cols): (Vec<_>, Vec<_>) = - itertools::multiunzip(join_ons); - left.join_detailed( - right.build()?, - join_type, - (left_cols, right_cols), - join_filter, - nulls_equal_nulls, - )? - .build() + from_substrait_agg_func( + consumer, + f, + input.schema(), + filter, + order_by, + distinct, + ) + .await } None => { - let on: Vec<String> = vec![]; - left.join_detailed( - right.build()?, - join_type, - (on.clone(), on), - None, - false, - )? - .build() + not_impl_err!("Aggregate without aggregate function is not supported") } - } + }; + aggr_exprs.push(agg_func?.as_ref().clone()); } - Some(RelType::Cross(cross)) => { - let left = LogicalPlanBuilder::from( - from_substrait_rel(state, cross.left.as_ref().unwrap(), extensions) - .await?, - ); - let right = LogicalPlanBuilder::from( - from_substrait_rel(state, cross.right.as_ref().unwrap(), extensions) - .await?, - ); - let (left, right) = requalify_sides_if_needed(left, right)?; - left.cross_join(right.build()?)?.build() + input.aggregate(group_exprs, aggr_exprs)?.build() + } else { + not_impl_err!("Aggregate without an input is not valid") + } +} + +pub async fn from_join_rel( + consumer: &impl SubstraitConsumer, + join: &JoinRel, +) -> Result<LogicalPlan> { + if join.post_join_filter.is_some() { + return not_impl_err!("JoinRel with post_join_filter is not yet supported"); + } + + let left: LogicalPlanBuilder = LogicalPlanBuilder::from( + from_substrait_rel(consumer, join.left.as_ref().unwrap()).await?, + ); + let right = LogicalPlanBuilder::from( + from_substrait_rel(consumer, join.right.as_ref().unwrap()).await?, + ); + let (left, right) = requalify_sides_if_needed(left, right)?; + + let join_type = from_substrait_jointype(join.r#type)?; + // The join condition expression needs full input schema and not the output schema from join since we lose columns from + // certain join types such as semi and anti joins + let in_join_schema = left.schema().join(right.schema())?; + + // If join expression exists, parse the `on` condition expression, build join and return + // Otherwise, build join with only the filter, without join keys + match &join.expression.as_ref() { + Some(expr) => { + let on = from_substrait_rex(consumer, expr, &in_join_schema).await?; + // The join expression can contain both equal and non-equal ops. + // As of datafusion 31.0.0, the equal and non equal join conditions are in separate fields. + // So we extract each part as follows: + // - If an Eq or IsNotDistinctFrom op is encountered, add the left column, right column and is_null_equal_nulls to `join_ons` vector + // - Otherwise we add the expression to join_filter (use conjunction if filter already exists) + let (join_ons, nulls_equal_nulls, join_filter) = + split_eq_and_noneq_join_predicate_with_nulls_equality(&on); + let (left_cols, right_cols): (Vec<_>, Vec<_>) = + itertools::multiunzip(join_ons); + left.join_detailed( + right.build()?, + join_type, + (left_cols, right_cols), + join_filter, + nulls_equal_nulls, + )? + .build() } - Some(RelType::Read(read)) => { - async fn read_with_schema( - state: &dyn SubstraitPlanningState, - table_ref: TableReference, - schema: DFSchema, - projection: &Option<MaskExpression>, - ) -> Result<LogicalPlan> { - let schema = schema.replace_qualifier(table_ref.clone()); - - let plan = { - let provider = match state.table(&table_ref).await? { - Some(ref provider) => Arc::clone(provider), - _ => return plan_err!("No table named '{table_ref}'"), - }; + None => { + let on: Vec<String> = vec![]; + left.join_detailed(right.build()?, join_type, (on.clone(), on), None, false)? + .build() + } + } +} - LogicalPlanBuilder::scan( - table_ref, - provider_as_source(Arc::clone(&provider)), - None, - )? - .build()? - }; +pub async fn from_cross_rel( + consumer: &impl SubstraitConsumer, + cross: &CrossRel, +) -> Result<LogicalPlan> { + let left = LogicalPlanBuilder::from( + from_substrait_rel(consumer, cross.left.as_ref().unwrap()).await?, + ); + let right = LogicalPlanBuilder::from( + from_substrait_rel(consumer, cross.right.as_ref().unwrap()).await?, + ); + let (left, right) = requalify_sides_if_needed(left, right)?; + left.cross_join(right.build()?)?.build() +} - ensure_schema_compatability(plan.schema(), schema.clone())?; +#[allow(deprecated)] +pub async fn from_read_rel( + consumer: &impl SubstraitConsumer, + read: &ReadRel, +) -> Result<LogicalPlan> { + async fn read_with_schema( + consumer: &impl SubstraitConsumer, + table_ref: TableReference, + schema: DFSchema, + projection: &Option<MaskExpression>, + ) -> Result<LogicalPlan> { + let schema = schema.replace_qualifier(table_ref.clone()); + + let plan = { + let provider = match consumer.resolve_table_ref(&table_ref).await? { + Some(ref provider) => Arc::clone(provider), + _ => return plan_err!("No table named '{table_ref}'"), + }; - let schema = apply_masking(schema, projection)?; + LogicalPlanBuilder::scan( + table_ref, + provider_as_source(Arc::clone(&provider)), + None, + )? + .build()? + }; - apply_projection(plan, schema) - } + ensure_schema_compatability(plan.schema(), schema.clone())?; - let named_struct = read.base_schema.as_ref().ok_or_else(|| { - substrait_datafusion_err!("No base schema provided for Read Relation") - })?; + let schema = apply_masking(schema, projection)?; - let substrait_schema = from_substrait_named_struct(named_struct, extensions)?; + apply_projection(plan, schema) + } - match &read.as_ref().read_type { - Some(ReadType::NamedTable(nt)) => { - let table_reference = match nt.names.len() { - 0 => { - return plan_err!("No table name found in NamedTable"); - } - 1 => TableReference::Bare { - table: nt.names[0].clone().into(), - }, - 2 => TableReference::Partial { - schema: nt.names[0].clone().into(), - table: nt.names[1].clone().into(), - }, - _ => TableReference::Full { - catalog: nt.names[0].clone().into(), - schema: nt.names[1].clone().into(), - table: nt.names[2].clone().into(), - }, - }; + let named_struct = read.base_schema.as_ref().ok_or_else(|| { + substrait_datafusion_err!("No base schema provided for Read Relation") + })?; - read_with_schema( - state, - table_reference, - substrait_schema, - &read.projection, - ) - .await + let substrait_schema = from_substrait_named_struct(consumer, named_struct)?; + + match &read.read_type { + Some(ReadType::NamedTable(nt)) => { + let table_reference = match nt.names.len() { + 0 => { + return plan_err!("No table name found in NamedTable"); } - Some(ReadType::VirtualTable(vt)) => { - if vt.values.is_empty() { - return Ok(LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row: false, - schema: DFSchemaRef::new(substrait_schema), - })); + 1 => TableReference::Bare { + table: nt.names[0].clone().into(), + }, + 2 => TableReference::Partial { + schema: nt.names[0].clone().into(), + table: nt.names[1].clone().into(), + }, + _ => TableReference::Full { + catalog: nt.names[0].clone().into(), + schema: nt.names[1].clone().into(), + table: nt.names[2].clone().into(), + }, + }; + + read_with_schema( + consumer, + table_reference, + substrait_schema, + &read.projection, + ) + .await + } + Some(ReadType::VirtualTable(vt)) => { + if vt.values.is_empty() { + return Ok(LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: DFSchemaRef::new(substrait_schema), + })); + } + + let values = vt + .values + .iter() + .map(|row| { + let mut name_idx = 0; + let lits = row + .fields + .iter() + .map(|lit| { + name_idx += 1; // top-level names are provided through schema + Ok(Expr::Literal(from_substrait_literal( + consumer, + lit, + &named_struct.names, + &mut name_idx, + )?)) + }) + .collect::<Result<_>>()?; + if name_idx != named_struct.names.len() { + return substrait_err!( + "Names list must match exactly to nested schema, but found {} uses for {} names", + name_idx, + named_struct.names.len() + ); } + Ok(lits) + }) + .collect::<Result<_>>()?; + + Ok(LogicalPlan::Values(Values { + schema: DFSchemaRef::new(substrait_schema), + values, + })) + } + Some(ReadType::LocalFiles(lf)) => { + fn extract_filename(name: &str) -> Option<String> { + let corrected_url = + if name.starts_with("file://") && !name.starts_with("file:///") { + name.replacen("file://", "file:///", 1) + } else { + name.to_string() + }; + + Url::parse(&corrected_url).ok().and_then(|url| { + let path = url.path(); + std::path::Path::new(path) + .file_name() + .map(|filename| filename.to_string_lossy().to_string()) + }) + } + + // we could use the file name to check the original table provider + // TODO: currently does not support multiple local files + let filename: Option<String> = + lf.items.first().and_then(|x| match x.path_type.as_ref() { + Some(UriFile(name)) => extract_filename(name), + _ => None, + }); + + if lf.items.len() > 1 || filename.is_none() { + return not_impl_err!("Only single file reads are supported"); + } + let name = filename.unwrap(); + // directly use unwrap here since we could determine it is a valid one + let table_reference = TableReference::Bare { table: name.into() }; + + read_with_schema( + consumer, + table_reference, + substrait_schema, + &read.projection, + ) + .await + } + _ => { + not_impl_err!("Unsupported ReadType: {:?}", read.read_type) + } + } +} + +pub async fn from_set_rel( + consumer: &impl SubstraitConsumer, + set: &SetRel, +) -> Result<LogicalPlan> { + if set.inputs.len() < 2 { + substrait_err!("Set operation requires at least two inputs") + } else { + match set.op() { + SetOp::UnionAll => union_rels(consumer, &set.inputs, true).await, + SetOp::UnionDistinct => union_rels(consumer, &set.inputs, false).await, + SetOp::IntersectionPrimary => LogicalPlanBuilder::intersect( + from_substrait_rel(consumer, &set.inputs[0]).await?, + union_rels(consumer, &set.inputs[1..], true).await?, + false, + ), + SetOp::IntersectionMultiset => { + intersect_rels(consumer, &set.inputs, false).await + } + SetOp::IntersectionMultisetAll => { + intersect_rels(consumer, &set.inputs, true).await + } + SetOp::MinusPrimary => except_rels(consumer, &set.inputs, false).await, + SetOp::MinusPrimaryAll => except_rels(consumer, &set.inputs, true).await, + set_op => not_impl_err!("Unsupported set operator: {set_op:?}"), + } + } +} - let values = vt - .values - .iter() - .map(|row| { - let mut name_idx = 0; - let lits = row - .fields - .iter() - .map(|lit| { - name_idx += 1; // top-level names are provided through schema - Ok(Expr::Literal(from_substrait_literal( - lit, - extensions, - &named_struct.names, - &mut name_idx, - )?)) - }) - .collect::<Result<_>>()?; - if name_idx != named_struct.names.len() { - return substrait_err!( - "Names list must match exactly to nested schema, but found {} uses for {} names", - name_idx, - named_struct.names.len() - ); - } - Ok(lits) - }) - .collect::<Result<_>>()?; +pub async fn from_extension_leaf_rel( + consumer: &impl SubstraitConsumer, + extension_leaf_rel: &ExtensionLeafRel, +) -> Result<LogicalPlan> { + consumer.consume_extension_leaf(extension_leaf_rel).await +} - Ok(LogicalPlan::Values(Values { - schema: DFSchemaRef::new(substrait_schema), - values, - })) - } - Some(ReadType::LocalFiles(lf)) => { - fn extract_filename(name: &str) -> Option<String> { - let corrected_url = if name.starts_with("file://") - && !name.starts_with("file:///") - { - name.replacen("file://", "file:///", 1) - } else { - name.to_string() - }; +pub async fn from_extension_single_rel( + consumer: &impl SubstraitConsumer, + extension_single_rel: &ExtensionSingleRel, +) -> Result<LogicalPlan> { + consumer + .consume_extension_single(extension_single_rel) + .await +} - Url::parse(&corrected_url).ok().and_then(|url| { - let path = url.path(); - std::path::Path::new(path) - .file_name() - .map(|filename| filename.to_string_lossy().to_string()) - }) - } +pub async fn from_extension_multi_rel( + consumer: &impl SubstraitConsumer, + extension_multi_rel: &ExtensionMultiRel, +) -> Result<LogicalPlan> { + consumer.consume_extension_multi(extension_multi_rel).await +} - // we could use the file name to check the original table provider - // TODO: currently does not support multiple local files - let filename: Option<String> = - lf.items.first().and_then(|x| match x.path_type.as_ref() { - Some(UriFile(name)) => extract_filename(name), - _ => None, - }); +pub async fn from_exchange_rel( + consumer: &impl SubstraitConsumer, + exchange: &ExchangeRel, +) -> Result<LogicalPlan> { + let Some(input) = exchange.input.as_ref() else { + return substrait_err!("Unexpected empty input in ExchangeRel"); + }; + let input = Arc::new(from_substrait_rel(consumer, input).await?); - if lf.items.len() > 1 || filename.is_none() { - return not_impl_err!("Only single file reads are supported"); - } - let name = filename.unwrap(); - // directly use unwrap here since we could determine it is a valid one - let table_reference = TableReference::Bare { table: name.into() }; - - read_with_schema( - state, - table_reference, - substrait_schema, - &read.projection, - ) - .await - } - _ => { - not_impl_err!("Unsupported ReadType: {:?}", &read.as_ref().read_type) - } - } - } - Some(RelType::Set(set)) => match set_rel::SetOp::try_from(set.op) { - Ok(set_op) => { - if set.inputs.len() < 2 { - substrait_err!("Set operation requires at least two inputs") - } else { - match set_op { - set_rel::SetOp::UnionAll => { - union_rels(&set.inputs, state, extensions, true).await - } - set_rel::SetOp::UnionDistinct => { - union_rels(&set.inputs, state, extensions, false).await - } - set_rel::SetOp::IntersectionPrimary => { - LogicalPlanBuilder::intersect( - from_substrait_rel(state, &set.inputs[0], extensions) - .await?, - union_rels(&set.inputs[1..], state, extensions, true) - .await?, - false, - ) - } - set_rel::SetOp::IntersectionMultiset => { - intersect_rels(&set.inputs, state, extensions, false).await - } - set_rel::SetOp::IntersectionMultisetAll => { - intersect_rels(&set.inputs, state, extensions, true).await - } - set_rel::SetOp::MinusPrimary => { - except_rels(&set.inputs, state, extensions, false).await - } - set_rel::SetOp::MinusPrimaryAll => { - except_rels(&set.inputs, state, extensions, true).await - } - _ => not_impl_err!("Unsupported set operator: {set_op:?}"), - } - } + let Some(exchange_kind) = &exchange.exchange_kind else { + return substrait_err!("Unexpected empty input in ExchangeRel"); + }; + + // ref: https://substrait.io/relations/physical_relations/#exchange-types + let partitioning_scheme = match exchange_kind { + ExchangeKind::ScatterByFields(scatter_fields) => { + let mut partition_columns = vec![]; + let input_schema = input.schema(); + for field_ref in &scatter_fields.fields { + let column = from_substrait_field_reference(field_ref, input_schema)?; + partition_columns.push(column); } - Err(e) => not_impl_err!("Invalid set operation type {}: {e}", set.op), - }, - Some(RelType::ExtensionLeaf(extension)) => { - let Some(ext_detail) = &extension.detail else { - return substrait_err!("Unexpected empty detail in ExtensionLeafRel"); - }; - let plan = state - .serializer_registry() - .deserialize_logical_plan(&ext_detail.type_url, &ext_detail.value)?; - Ok(LogicalPlan::Extension(Extension { node: plan })) - } - Some(RelType::ExtensionSingle(extension)) => { - let Some(ext_detail) = &extension.detail else { - return substrait_err!("Unexpected empty detail in ExtensionSingleRel"); - }; - let plan = state - .serializer_registry() - .deserialize_logical_plan(&ext_detail.type_url, &ext_detail.value)?; Review Comment: The usage of `SerializerRegistry` is problematic here because `deserialize_logical_plan` assumes that the extension details are a fully formed `UserDefinedLogicalNode`. This _may_ sometimes be the case, but the purpose of the Substrait Extension relations is to allow users to encoded arbitrary relations into Substrait in a manner they see fit. The only really foolproof way of handling this is to provide a hook like `consume_extension_single` which users can use to provide their deserialisation logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org