This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-53 by this push:
new 9e70cdd18f [branch-53] fix: sqllogictest cannot convert <subquery> to
Substrait (#19739) (#20897)
9e70cdd18f is described below
commit 9e70cdd18f5f67729a8f9505ca637ecbdbff0915
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 12 11:58:15 2026 -0400
[branch-53] fix: sqllogictest cannot convert <subquery> to Substrait
(#19739) (#20897)
- Part of https://github.com/apache/datafusion/issues/19692
- Closes https://github.com/apache/datafusion/issues/16281 on branch-53
This PR:
- Backports https://github.com/apache/datafusion/pull/19739 from
@kumarUjjawal to the branch-53 line
Co-authored-by: Kumar Ujjawal <[email protected]>
---
.../logical_plan/producer/expr/field_reference.rs | 16 ++
.../src/logical_plan/producer/expr/mod.rs | 8 +-
.../logical_plan/producer/expr/singular_or_list.rs | 22 +--
.../src/logical_plan/producer/expr/subquery.rs | 81 +++++++---
.../logical_plan/producer/substrait_producer.rs | 30 +++-
.../substrait/src/logical_plan/producer/utils.rs | 27 +++-
.../tests/cases/roundtrip_logical_plan.rs | 165 ++++++++++++++++++++-
7 files changed, 297 insertions(+), 52 deletions(-)
diff --git
a/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs
b/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs
index b6af7d3bbc..aa34317a6e 100644
--- a/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs
+++ b/datafusion/substrait/src/logical_plan/producer/expr/field_reference.rs
@@ -76,6 +76,22 @@ pub(crate) fn try_to_substrait_field_reference(
}
}
+/// Convert an outer reference column to a Substrait field reference.
+/// Outer reference columns reference columns from an outer query scope in
correlated subqueries.
+/// We convert them the same way as regular columns since the subquery plan
will be
+/// reconstructed with the proper schema context during consumption.
+pub fn from_outer_reference_column(
+ col: &Column,
+ schema: &DFSchemaRef,
+) -> datafusion::common::Result<Expression> {
+ // OuterReferenceColumn is converted similarly to a regular column
reference.
+ // The schema provided should be the schema context in which the outer
reference
+ // column appears. During Substrait round-trip, the consumer will
reconstruct
+ // the outer reference based on the subquery context.
+ let index = schema.index_of_column(col)?;
+ substrait_field_ref(index)
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/datafusion/substrait/src/logical_plan/producer/expr/mod.rs
b/datafusion/substrait/src/logical_plan/producer/expr/mod.rs
index 74b1a65215..3aa8aa2b68 100644
--- a/datafusion/substrait/src/logical_plan/producer/expr/mod.rs
+++ b/datafusion/substrait/src/logical_plan/producer/expr/mod.rs
@@ -139,17 +139,17 @@ pub fn to_substrait_rex(
}
Expr::WindowFunction(expr) => producer.handle_window_function(expr,
schema),
Expr::InList(expr) => producer.handle_in_list(expr, schema),
- Expr::Exists(expr) => not_impl_err!("Cannot convert {expr:?} to
Substrait"),
+ Expr::Exists(expr) => producer.handle_exists(expr, schema),
Expr::InSubquery(expr) => producer.handle_in_subquery(expr, schema),
Expr::SetComparison(expr) => producer.handle_set_comparison(expr,
schema),
- Expr::ScalarSubquery(expr) => {
- not_impl_err!("Cannot convert {expr:?} to Substrait")
- }
+ Expr::ScalarSubquery(expr) => producer.handle_scalar_subquery(expr,
schema),
#[expect(deprecated)]
Expr::Wildcard { .. } => not_impl_err!("Cannot convert {expr:?} to
Substrait"),
Expr::GroupingSet(expr) => not_impl_err!("Cannot convert {expr:?} to
Substrait"),
Expr::Placeholder(expr) => not_impl_err!("Cannot convert {expr:?} to
Substrait"),
Expr::OuterReferenceColumn(_, _) => {
+ // OuterReferenceColumn requires tracking outer query schema
context for correlated
+ // subqueries. This is a complex feature that is not yet
implemented.
not_impl_err!("Cannot convert {expr:?} to Substrait")
}
Expr::Unnest(expr) => not_impl_err!("Cannot convert {expr:?} to
Substrait"),
diff --git
a/datafusion/substrait/src/logical_plan/producer/expr/singular_or_list.rs
b/datafusion/substrait/src/logical_plan/producer/expr/singular_or_list.rs
index 2d53db6501..fd09a60d5e 100644
--- a/datafusion/substrait/src/logical_plan/producer/expr/singular_or_list.rs
+++ b/datafusion/substrait/src/logical_plan/producer/expr/singular_or_list.rs
@@ -15,12 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use crate::logical_plan::producer::SubstraitProducer;
+use crate::logical_plan::producer::{SubstraitProducer, negate};
use datafusion::common::DFSchemaRef;
use datafusion::logical_expr::expr::InList;
-use substrait::proto::expression::{RexType, ScalarFunction, SingularOrList};
-use substrait::proto::function_argument::ArgType;
-use substrait::proto::{Expression, FunctionArgument};
+use substrait::proto::Expression;
+use substrait::proto::expression::{RexType, SingularOrList};
pub fn from_in_list(
producer: &mut impl SubstraitProducer,
@@ -46,20 +45,7 @@ pub fn from_in_list(
};
if *negated {
- let function_anchor = producer.register_function("not".to_string());
-
- #[expect(deprecated)]
- Ok(Expression {
- rex_type: Some(RexType::ScalarFunction(ScalarFunction {
- function_reference: function_anchor,
- arguments: vec![FunctionArgument {
- arg_type: Some(ArgType::Value(substrait_or_list)),
- }],
- output_type: None,
- args: vec![],
- options: vec![],
- })),
- })
+ Ok(negate(producer, substrait_or_list))
} else {
Ok(substrait_or_list)
}
diff --git a/datafusion/substrait/src/logical_plan/producer/expr/subquery.rs
b/datafusion/substrait/src/logical_plan/producer/expr/subquery.rs
index e5b9241c10..97699c2132 100644
--- a/datafusion/substrait/src/logical_plan/producer/expr/subquery.rs
+++ b/datafusion/substrait/src/logical_plan/producer/expr/subquery.rs
@@ -15,15 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-use crate::logical_plan::producer::SubstraitProducer;
+use crate::logical_plan::producer::{SubstraitProducer, negate};
use datafusion::common::{DFSchemaRef, substrait_err};
-use datafusion::logical_expr::Operator;
-use datafusion::logical_expr::expr::{InSubquery, SetComparison, SetQuantifier};
-use substrait::proto::expression::subquery::InPredicate;
+use datafusion::logical_expr::expr::{Exists, InSubquery, SetComparison,
SetQuantifier};
+use datafusion::logical_expr::{Operator, Subquery};
+use substrait::proto::Expression;
+use substrait::proto::expression::RexType;
use substrait::proto::expression::subquery::set_comparison::{ComparisonOp,
ReductionOp};
-use substrait::proto::expression::{RexType, ScalarFunction};
-use substrait::proto::function_argument::ArgType;
-use substrait::proto::{Expression, FunctionArgument};
+use substrait::proto::expression::subquery::{InPredicate, Scalar,
SetPredicate};
pub fn from_in_subquery(
producer: &mut impl SubstraitProducer,
@@ -54,20 +53,7 @@ pub fn from_in_subquery(
))),
};
if *negated {
- let function_anchor = producer.register_function("not".to_string());
-
- #[expect(deprecated)]
- Ok(Expression {
- rex_type: Some(RexType::ScalarFunction(ScalarFunction {
- function_reference: function_anchor,
- arguments: vec![FunctionArgument {
- arg_type: Some(ArgType::Value(substrait_subquery)),
- }],
- output_type: None,
- args: vec![],
- options: vec![],
- })),
- })
+ Ok(negate(producer, substrait_subquery))
} else {
Ok(substrait_subquery)
}
@@ -122,3 +108,56 @@ pub fn from_set_comparison(
))),
})
}
+
+/// Convert DataFusion ScalarSubquery to Substrait Scalar subquery type
+pub fn from_scalar_subquery(
+ producer: &mut impl SubstraitProducer,
+ subquery: &Subquery,
+ _schema: &DFSchemaRef,
+) -> datafusion::common::Result<Expression> {
+ let subquery_plan = producer.handle_plan(subquery.subquery.as_ref())?;
+
+ Ok(Expression {
+ rex_type: Some(RexType::Subquery(Box::new(
+ substrait::proto::expression::Subquery {
+ subquery_type: Some(
+
substrait::proto::expression::subquery::SubqueryType::Scalar(
+ Box::new(Scalar {
+ input: Some(subquery_plan),
+ }),
+ ),
+ ),
+ },
+ ))),
+ })
+}
+
+/// Convert DataFusion Exists expression to Substrait SetPredicate subquery
type
+pub fn from_exists(
+ producer: &mut impl SubstraitProducer,
+ exists: &Exists,
+ _schema: &DFSchemaRef,
+) -> datafusion::common::Result<Expression> {
+ let subquery_plan =
producer.handle_plan(exists.subquery.subquery.as_ref())?;
+
+ let substrait_exists = Expression {
+ rex_type: Some(RexType::Subquery(Box::new(
+ substrait::proto::expression::Subquery {
+ subquery_type: Some(
+
substrait::proto::expression::subquery::SubqueryType::SetPredicate(
+ Box::new(SetPredicate {
+ predicate_op:
substrait::proto::expression::subquery::set_predicate::PredicateOp::Exists as
i32,
+ tuples: Some(subquery_plan),
+ }),
+ ),
+ ),
+ },
+ ))),
+ };
+
+ if exists.negated {
+ Ok(negate(producer, substrait_exists))
+ } else {
+ Ok(substrait_exists)
+ }
+}
diff --git
a/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs
b/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs
index c7518bd04e..51d2c0ca8e 100644
--- a/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs
+++ b/datafusion/substrait/src/logical_plan/producer/substrait_producer.rs
@@ -18,18 +18,19 @@
use crate::extensions::Extensions;
use crate::logical_plan::producer::{
from_aggregate, from_aggregate_function, from_alias, from_between,
from_binary_expr,
- from_case, from_cast, from_column, from_distinct, from_empty_relation,
from_filter,
- from_in_list, from_in_subquery, from_join, from_like, from_limit,
from_literal,
- from_projection, from_repartition, from_scalar_function,
from_set_comparison,
- from_sort, from_subquery_alias, from_table_scan, from_try_cast,
from_unary_expr,
- from_union, from_values, from_window, from_window_function,
to_substrait_rel,
- to_substrait_rex,
+ from_case, from_cast, from_column, from_distinct, from_empty_relation,
from_exists,
+ from_filter, from_in_list, from_in_subquery, from_join, from_like,
from_limit,
+ from_literal, from_projection, from_repartition, from_scalar_function,
+ from_scalar_subquery, from_set_comparison, from_sort, from_subquery_alias,
+ from_table_scan, from_try_cast, from_unary_expr, from_union, from_values,
+ from_window, from_window_function, to_substrait_rel, to_substrait_rex,
};
use datafusion::common::{Column, DFSchemaRef, ScalarValue, substrait_err};
use datafusion::execution::SessionState;
use datafusion::execution::registry::SerializerRegistry;
+use datafusion::logical_expr::Subquery;
use datafusion::logical_expr::expr::{
- Alias, InList, InSubquery, SetComparison, WindowFunction,
+ Alias, Exists, InList, InSubquery, SetComparison, WindowFunction,
};
use datafusion::logical_expr::{
Aggregate, Between, BinaryExpr, Case, Cast, Distinct, EmptyRelation, Expr,
Extension,
@@ -372,6 +373,21 @@ pub trait SubstraitProducer: Send + Sync + Sized {
) -> datafusion::common::Result<Expression> {
from_set_comparison(self, set_comparison, schema)
}
+ fn handle_scalar_subquery(
+ &mut self,
+ subquery: &Subquery,
+ schema: &DFSchemaRef,
+ ) -> datafusion::common::Result<Expression> {
+ from_scalar_subquery(self, subquery, schema)
+ }
+
+ fn handle_exists(
+ &mut self,
+ exists: &Exists,
+ schema: &DFSchemaRef,
+ ) -> datafusion::common::Result<Expression> {
+ from_exists(self, exists, schema)
+ }
}
pub struct DefaultSubstraitProducer<'a> {
diff --git a/datafusion/substrait/src/logical_plan/producer/utils.rs
b/datafusion/substrait/src/logical_plan/producer/utils.rs
index 820c14809d..e8310f4acd 100644
--- a/datafusion/substrait/src/logical_plan/producer/utils.rs
+++ b/datafusion/substrait/src/logical_plan/producer/utils.rs
@@ -19,8 +19,8 @@ use crate::logical_plan::producer::SubstraitProducer;
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
use datafusion::common::{DFSchemaRef, plan_err};
use datafusion::logical_expr::SortExpr;
-use substrait::proto::SortField;
use substrait::proto::sort_field::{SortDirection, SortKind};
+use substrait::proto::{Expression, SortField};
// Substrait wants a list of all field names, including nested fields from
structs,
// also from within e.g. lists and maps. However, it does not want the list
and map field names
@@ -85,3 +85,28 @@ pub(crate) fn to_substrait_precision(time_unit: &TimeUnit)
-> i32 {
TimeUnit::Nanosecond => 9,
}
}
+
+/// Wraps an expression with a `not()` function.
+pub(crate) fn negate(
+ producer: &mut impl SubstraitProducer,
+ expr: Expression,
+) -> Expression {
+ let function_anchor = producer.register_function("not".to_string());
+
+ #[expect(deprecated)]
+ Expression {
+ rex_type: Some(substrait::proto::expression::RexType::ScalarFunction(
+ substrait::proto::expression::ScalarFunction {
+ function_reference: function_anchor,
+ arguments: vec![substrait::proto::FunctionArgument {
+ arg_type:
Some(substrait::proto::function_argument::ArgType::Value(
+ expr,
+ )),
+ }],
+ output_type: None,
+ args: vec![],
+ options: vec![],
+ },
+ )),
+ }
+}
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index 926eb8a343..5dd4aa4e2b 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -34,7 +34,7 @@ use datafusion::error::Result;
use datafusion::execution::registry::SerializerRegistry;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::session_state::SessionStateBuilder;
-use datafusion::logical_expr::expr::{SetComparison, SetQuantifier};
+use datafusion::logical_expr::expr::{Exists, SetComparison, SetQuantifier};
use datafusion::logical_expr::{
EmptyRelation, Extension, InvariantLevel, LogicalPlan, Operator,
PartitionEvaluator,
Repartition, Subquery, UserDefinedLogicalNode, Values, Volatility,
@@ -713,6 +713,37 @@ async fn roundtrip_set_comparison_all_substrait() ->
Result<()> {
Ok(())
}
+#[tokio::test]
+async fn roundtrip_scalar_subquery_substrait() -> Result<()> {
+ let ctx = create_context().await?;
+ let plan = build_scalar_subquery_projection_plan(&ctx).await?;
+ let proto = to_substrait_plan(&plan, &ctx.state())?;
+ assert_root_project_has_scalar_subquery(proto.as_ref());
+ let roundtrip_plan = from_substrait_plan(&ctx.state(), &proto).await?;
+ assert_projection_contains_scalar_subquery(&roundtrip_plan);
+ Ok(())
+}
+
+#[tokio::test]
+async fn roundtrip_exists_substrait() -> Result<()> {
+ let ctx = create_context().await?;
+ let plan = build_exists_filter_plan(&ctx, false).await?;
+ let proto = to_substrait_plan(&plan, &ctx.state())?;
+ let roundtrip_plan = from_substrait_plan(&ctx.state(), &proto).await?;
+ assert_exists_predicate(&roundtrip_plan, false);
+ Ok(())
+}
+
+#[tokio::test]
+async fn roundtrip_not_exists_substrait() -> Result<()> {
+ let ctx = create_context().await?;
+ let plan = build_exists_filter_plan(&ctx, true).await?;
+ let proto = to_substrait_plan(&plan, &ctx.state())?;
+ let roundtrip_plan = from_substrait_plan(&ctx.state(), &proto).await?;
+ assert_exists_predicate(&roundtrip_plan, true);
+ Ok(())
+}
+
#[tokio::test]
async fn roundtrip_not_exists_filter_left_anti_join() -> Result<()> {
let plan = generate_plan_from_sql(
@@ -1959,6 +1990,56 @@ async fn build_set_comparison_plan(
.build()
}
+async fn build_scalar_subquery_projection_plan(
+ ctx: &SessionContext,
+) -> Result<LogicalPlan> {
+ let subquery_scan = ctx.table("data2").await?.into_unoptimized_plan();
+ let subquery_plan = LogicalPlanBuilder::from(subquery_scan)
+ .project(vec![col("a")])?
+ .limit(0, Some(1))?
+ .build()?;
+
+ let scalar_subquery = Expr::ScalarSubquery(Subquery {
+ subquery: Arc::new(subquery_plan),
+ outer_ref_columns: vec![],
+ spans: Spans::new(),
+ });
+
+ let outer_empty_relation = LogicalPlan::EmptyRelation(EmptyRelation {
+ produce_one_row: true,
+ schema: DFSchemaRef::new(DFSchema::empty()),
+ });
+
+ LogicalPlanBuilder::from(outer_empty_relation)
+ .project(vec![scalar_subquery.alias("sq")])?
+ .build()
+}
+
+async fn build_exists_filter_plan(
+ ctx: &SessionContext,
+ negated: bool,
+) -> Result<LogicalPlan> {
+ let base_scan = ctx.table("data").await?.into_unoptimized_plan();
+ let subquery_scan = ctx.table("data2").await?.into_unoptimized_plan();
+ let subquery_plan = LogicalPlanBuilder::from(subquery_scan)
+ .project(vec![col("data2.a")])?
+ .build()?;
+
+ let predicate = Expr::Exists(Exists::new(
+ Subquery {
+ subquery: Arc::new(subquery_plan),
+ outer_ref_columns: vec![],
+ spans: Spans::new(),
+ },
+ negated,
+ ));
+
+ LogicalPlanBuilder::from(base_scan)
+ .filter(predicate)?
+ .project(vec![col("data.a")])?
+ .build()
+}
+
fn assert_set_comparison_predicate(
plan: &LogicalPlan,
expected_op: Operator,
@@ -1982,6 +2063,88 @@ fn assert_set_comparison_predicate(
}
}
+fn assert_root_project_has_scalar_subquery(proto: &Plan) {
+ let relation = proto
+ .relations
+ .first()
+ .expect("expected Substrait plan to have at least one relation");
+
+ let root = match relation.rel_type.as_ref() {
+ Some(plan_rel::RelType::Root(root)) => root,
+ other => panic!("expected root relation, got {other:?}"),
+ };
+
+ let input = root.input.as_ref().expect("expected root input relation");
+ let project = match input.rel_type.as_ref() {
+ Some(RelType::Project(project)) => project,
+ other => panic!("expected Project relation at root input, got
{other:?}"),
+ };
+
+ let expr = project
+ .expressions
+ .first()
+ .expect("expected at least one project expression");
+ let subquery = match expr.rex_type.as_ref() {
+ Some(substrait::proto::expression::RexType::Subquery(subquery)) =>
subquery,
+ other => panic!("expected Subquery expression, got {other:?}"),
+ };
+
+ assert!(
+ matches!(
+ subquery.subquery_type.as_ref(),
+
Some(substrait::proto::expression::subquery::SubqueryType::Scalar(_))
+ ),
+ "expected scalar subquery type"
+ );
+}
+
+fn assert_projection_contains_scalar_subquery(plan: &LogicalPlan) {
+ let projection = match plan {
+ LogicalPlan::Projection(projection) => projection,
+ other => panic!("expected Projection plan, got {other:?}"),
+ };
+
+ let found_scalar_subquery =
projection.expr.iter().any(expr_contains_scalar_subquery);
+ assert!(
+ found_scalar_subquery,
+ "expected Projection to contain ScalarSubquery expression"
+ );
+}
+
+fn expr_contains_scalar_subquery(expr: &Expr) -> bool {
+ match expr {
+ Expr::ScalarSubquery(_) => true,
+ Expr::Alias(alias) =>
expr_contains_scalar_subquery(alias.expr.as_ref()),
+ _ => false,
+ }
+}
+
+fn assert_exists_predicate(plan: &LogicalPlan, expected_negated: bool) {
+ let predicate = match plan {
+ LogicalPlan::Projection(projection) => match projection.input.as_ref()
{
+ LogicalPlan::Filter(filter) => &filter.predicate,
+ other => panic!("expected Filter inside Projection, got
{other:?}"),
+ },
+ LogicalPlan::Filter(filter) => &filter.predicate,
+ other => panic!("expected Filter plan, got {other:?}"),
+ };
+
+ if expected_negated {
+ match predicate {
+ Expr::Not(inner) => match inner.as_ref() {
+ Expr::Exists(exists) => assert!(!exists.negated),
+ other => panic!("expected Exists inside NOT, got {other:?}"),
+ },
+ other => panic!("expected NOT EXISTS predicate, got {other:?}"),
+ }
+ } else {
+ match predicate {
+ Expr::Exists(exists) => assert!(!exists.negated),
+ other => panic!("expected EXISTS predicate, got {other:?}"),
+ }
+ }
+}
+
async fn roundtrip_fill_na(sql: &str) -> Result<()> {
let ctx = create_context().await?;
let df = ctx.sql(sql).await?;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]