This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 32cb3c5a54 Minor: remove clones and unnecessary Arcs in
`from_substrait_rex` (#11337)
32cb3c5a54 is described below
commit 32cb3c5a54bd0297d473792c8a3b0e7fd51c2e3b
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Jul 10 14:21:44 2024 -0400
Minor: remove clones and unnecessary Arcs in `from_substrait_rex` (#11337)
---
datafusion/substrait/src/logical_plan/consumer.rs | 146 +++++++++-------------
1 file changed, 59 insertions(+), 87 deletions(-)
diff --git a/datafusion/substrait/src/logical_plan/consumer.rs
b/datafusion/substrait/src/logical_plan/consumer.rs
index 89a6dde51e..a4f7242024 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -411,11 +411,11 @@ pub async fn from_substrait_rel(
from_substrait_rex(ctx, e, input.clone().schema(),
extensions)
.await?;
// if the expression is WindowFunction, wrap in a Window
relation
- if let Expr::WindowFunction(_) = x.as_ref() {
+ if let Expr::WindowFunction(_) = &x {
// Adding the same expression here and in the project
below
// works because the project's builder uses
columnize_expr(..)
// to transform it into a column reference
- input = input.window(vec![x.as_ref().clone()])?
+ input = input.window(vec![x.clone()])?
}
// Ensure the expression has a unique display name, so
that project's
// validate_unique_names doesn't fail
@@ -426,12 +426,12 @@ pub async fn from_substrait_rel(
new_name = format!("{}__temp__{}", name, i);
i += 1;
}
- names.insert(new_name.clone());
if new_name != name {
- exprs.push(x.as_ref().clone().alias(new_name.clone()));
+ exprs.push(x.alias(new_name.clone()));
} else {
- exprs.push(x.as_ref().clone());
+ exprs.push(x);
}
+ names.insert(new_name);
}
input.project(exprs)?.build()
} else {
@@ -447,7 +447,7 @@ pub async fn from_substrait_rel(
let expr =
from_substrait_rex(ctx, condition, input.schema(),
extensions)
.await?;
- input.filter(expr.as_ref().clone())?.build()
+ input.filter(expr)?.build()
} else {
not_impl_err!("Filter without an condition is not valid")
}
@@ -499,7 +499,7 @@ pub async fn from_substrait_rel(
let x =
from_substrait_rex(ctx, e, input.schema(),
extensions)
.await?;
- group_expr.push(x.as_ref().clone());
+ group_expr.push(x);
}
}
_ => {
@@ -514,7 +514,7 @@ pub async fn from_substrait_rel(
extensions,
)
.await?;
- grouping_set.push(x.as_ref().clone());
+ grouping_set.push(x);
}
grouping_sets.push(grouping_set);
}
@@ -532,9 +532,7 @@ pub async fn from_substrait_rel(
let filter = match &m.filter {
Some(fil) => Some(Box::new(
from_substrait_rex(ctx, fil, input.schema(),
extensions)
- .await?
- .as_ref()
- .clone(),
+ .await?,
)),
None => None,
};
@@ -931,7 +929,7 @@ pub async fn from_substrait_sorts(
};
let (asc, nulls_first) = asc_nullfirst.unwrap();
sorts.push(Expr::Sort(Sort {
- expr: Box::new(expr.as_ref().clone()),
+ expr: Box::new(expr),
asc,
nulls_first,
}));
@@ -949,7 +947,7 @@ pub async fn from_substrait_rex_vec(
let mut expressions: Vec<Expr> = vec![];
for expr in exprs {
let expression = from_substrait_rex(ctx, expr, input_schema,
extensions).await?;
- expressions.push(expression.as_ref().clone());
+ expressions.push(expression);
}
Ok(expressions)
}
@@ -969,7 +967,7 @@ pub async fn from_substrait_func_args(
}
_ => not_impl_err!("Function argument non-Value type not
supported"),
};
- args.push(arg_expr?.as_ref().clone());
+ args.push(arg_expr?);
}
Ok(args)
}
@@ -1028,17 +1026,15 @@ pub async fn from_substrait_rex(
e: &Expression,
input_schema: &DFSchema,
extensions: &HashMap<u32, &String>,
-) -> Result<Arc<Expr>> {
+) -> Result<Expr> {
match &e.rex_type {
Some(RexType::SingularOrList(s)) => {
let substrait_expr = s.value.as_ref().unwrap();
let substrait_list = s.options.as_ref();
- Ok(Arc::new(Expr::InList(InList {
+ Ok(Expr::InList(InList {
expr: Box::new(
from_substrait_rex(ctx, substrait_expr, input_schema,
extensions)
- .await?
- .as_ref()
- .clone(),
+ .await?,
),
list: from_substrait_rex_vec(
ctx,
@@ -1048,11 +1044,11 @@ pub async fn from_substrait_rex(
)
.await?,
negated: false,
- })))
+ }))
+ }
+ Some(RexType::Selection(field_ref)) => {
+ Ok(from_substrait_field_reference(field_ref, input_schema)?)
}
- Some(RexType::Selection(field_ref)) => Ok(Arc::new(
- from_substrait_field_reference(field_ref, input_schema)?,
- )),
Some(RexType::IfThen(if_then)) => {
// Parse `ifs`
// If the first element does not have a `then` part, then we can
assume it's a base expression
@@ -1069,9 +1065,7 @@ pub async fn from_substrait_rex(
input_schema,
extensions,
)
- .await?
- .as_ref()
- .clone(),
+ .await?,
));
continue;
}
@@ -1084,9 +1078,7 @@ pub async fn from_substrait_rex(
input_schema,
extensions,
)
- .await?
- .as_ref()
- .clone(),
+ .await?,
),
Box::new(
from_substrait_rex(
@@ -1095,27 +1087,22 @@ pub async fn from_substrait_rex(
input_schema,
extensions,
)
- .await?
- .as_ref()
- .clone(),
+ .await?,
),
));
}
// Parse `else`
let else_expr = match &if_then.r#else {
Some(e) => Some(Box::new(
- from_substrait_rex(ctx, e, input_schema, extensions)
- .await?
- .as_ref()
- .clone(),
+ from_substrait_rex(ctx, e, input_schema,
extensions).await?,
)),
None => None,
};
- Ok(Arc::new(Expr::Case(Case {
+ Ok(Expr::Case(Case {
expr,
when_then_expr,
else_expr,
- })))
+ }))
}
Some(RexType::ScalarFunction(f)) => {
let Some(fn_name) = extensions.get(&f.function_reference) else {
@@ -1133,8 +1120,9 @@ pub async fn from_substrait_rex(
// try to first match the requested function into registered udfs,
then built-in ops
// and finally built-in expressions
if let Some(func) = ctx.state().scalar_functions().get(fn_name) {
- Ok(Arc::new(Expr::ScalarFunction(
- expr::ScalarFunction::new_udf(func.to_owned(), args),
+ Ok(Expr::ScalarFunction(expr::ScalarFunction::new_udf(
+ func.to_owned(),
+ args,
)))
} else if let Some(op) = name_to_op(fn_name) {
if f.arguments.len() < 2 {
@@ -1147,17 +1135,14 @@ pub async fn from_substrait_rex(
// In those cases we iterate through all the arguments,
applying the binary expression against them all
let combined_expr = args
.into_iter()
- .fold(None, |combined_expr: Option<Arc<Expr>>, arg: Expr| {
+ .fold(None, |combined_expr: Option<Expr>, arg: Expr| {
Some(match combined_expr {
- Some(expr) => Arc::new(Expr::BinaryExpr(BinaryExpr
{
- left: Box::new(
- Arc::try_unwrap(expr)
- .unwrap_or_else(|arc: Arc<Expr>|
(*arc).clone()),
- ), // Avoid cloning if possible
+ Some(expr) => Expr::BinaryExpr(BinaryExpr {
+ left: Box::new(expr),
op,
right: Box::new(arg),
- })),
- None => Arc::new(arg),
+ }),
+ None => arg,
})
})
.unwrap();
@@ -1171,10 +1156,10 @@ pub async fn from_substrait_rex(
}
Some(RexType::Literal(lit)) => {
let scalar_value = from_substrait_literal_without_names(lit)?;
- Ok(Arc::new(Expr::Literal(scalar_value)))
+ Ok(Expr::Literal(scalar_value))
}
Some(RexType::Cast(cast)) => match cast.as_ref().r#type.as_ref() {
- Some(output_type) => Ok(Arc::new(Expr::Cast(Cast::new(
+ Some(output_type) => Ok(Expr::Cast(Cast::new(
Box::new(
from_substrait_rex(
ctx,
@@ -1182,12 +1167,10 @@ pub async fn from_substrait_rex(
input_schema,
extensions,
)
- .await?
- .as_ref()
- .clone(),
+ .await?,
),
from_substrait_type_without_names(output_type)?,
- )))),
+ ))),
None => substrait_err!("Cast expression without output type is not
allowed"),
},
Some(RexType::WindowFunction(window)) => {
@@ -1232,7 +1215,7 @@ pub async fn from_substrait_rex(
}
}
};
- Ok(Arc::new(Expr::WindowFunction(expr::WindowFunction {
+ Ok(Expr::WindowFunction(expr::WindowFunction {
fun,
args: from_substrait_func_args(
ctx,
@@ -1255,7 +1238,7 @@ pub async fn from_substrait_rex(
from_substrait_bound(&window.upper_bound, false)?,
),
null_treatment: None,
- })))
+ }))
}
Some(RexType::Subquery(subquery)) => match
&subquery.as_ref().subquery_type {
Some(subquery_type) => match subquery_type {
@@ -1270,7 +1253,7 @@ pub async fn from_substrait_rex(
from_substrait_rel(ctx, haystack_expr,
extensions)
.await?;
let outer_refs = haystack_expr.all_out_ref_exprs();
- Ok(Arc::new(Expr::InSubquery(InSubquery {
+ Ok(Expr::InSubquery(InSubquery {
expr: Box::new(
from_substrait_rex(
ctx,
@@ -1278,16 +1261,14 @@ pub async fn from_substrait_rex(
input_schema,
extensions,
)
- .await?
- .as_ref()
- .clone(),
+ .await?,
),
subquery: Subquery {
subquery: Arc::new(haystack_expr),
outer_ref_columns: outer_refs,
},
negated: false,
- })))
+ }))
} else {
substrait_err!("InPredicate Subquery type must
have a Haystack expression")
}
@@ -1301,10 +1282,10 @@ pub async fn from_substrait_rex(
)
.await?;
let outer_ref_columns = plan.all_out_ref_exprs();
- Ok(Arc::new(Expr::ScalarSubquery(Subquery {
+ Ok(Expr::ScalarSubquery(Subquery {
subquery: Arc::new(plan),
outer_ref_columns,
- })))
+ }))
}
SubqueryType::SetPredicate(predicate) => {
match predicate.predicate_op() {
@@ -1318,13 +1299,13 @@ pub async fn from_substrait_rex(
)
.await?;
let outer_ref_columns = plan.all_out_ref_exprs();
- Ok(Arc::new(Expr::Exists(Exists::new(
+ Ok(Expr::Exists(Exists::new(
Subquery {
subquery: Arc::new(plan),
outer_ref_columns,
},
false,
- ))))
+ )))
}
other_type => substrait_err!(
"unimplemented type {:?} for set predicate",
@@ -1337,7 +1318,7 @@ pub async fn from_substrait_rex(
}
},
None => {
- substrait_err!("Subquery experssion without SubqueryType is
not allowed")
+ substrait_err!("Subquery expression without SubqueryType is
not allowed")
}
},
_ => not_impl_err!("unsupported rex_type"),
@@ -2001,7 +1982,7 @@ impl BuiltinExprBuilder {
f: &ScalarFunction,
input_schema: &DFSchema,
extensions: &HashMap<u32, &String>,
- ) -> Result<Arc<Expr>> {
+ ) -> Result<Expr> {
match self.expr_name.as_str() {
"like" => {
Self::build_like_expr(ctx, false, f, input_schema,
extensions).await
@@ -2026,17 +2007,15 @@ impl BuiltinExprBuilder {
f: &ScalarFunction,
input_schema: &DFSchema,
extensions: &HashMap<u32, &String>,
- ) -> Result<Arc<Expr>> {
+ ) -> Result<Expr> {
if f.arguments.len() != 1 {
return substrait_err!("Expect one argument for {fn_name} expr");
}
let Some(ArgType::Value(expr_substrait)) = &f.arguments[0].arg_type
else {
return substrait_err!("Invalid arguments type for {fn_name} expr");
};
- let arg = from_substrait_rex(ctx, expr_substrait, input_schema,
extensions)
- .await?
- .as_ref()
- .clone();
+ let arg =
+ from_substrait_rex(ctx, expr_substrait, input_schema,
extensions).await?;
let arg = Box::new(arg);
let expr = match fn_name {
@@ -2053,7 +2032,7 @@ impl BuiltinExprBuilder {
_ => return not_impl_err!("Unsupported builtin expression: {}",
fn_name),
};
- Ok(Arc::new(expr))
+ Ok(expr)
}
async fn build_like_expr(
@@ -2062,7 +2041,7 @@ impl BuiltinExprBuilder {
f: &ScalarFunction,
input_schema: &DFSchema,
extensions: &HashMap<u32, &String>,
- ) -> Result<Arc<Expr>> {
+ ) -> Result<Expr> {
let fn_name = if case_insensitive { "ILIKE" } else { "LIKE" };
if f.arguments.len() != 2 && f.arguments.len() != 3 {
return substrait_err!("Expect two or three arguments for
`{fn_name}` expr");
@@ -2071,18 +2050,13 @@ impl BuiltinExprBuilder {
let Some(ArgType::Value(expr_substrait)) = &f.arguments[0].arg_type
else {
return substrait_err!("Invalid arguments type for `{fn_name}`
expr");
};
- let expr = from_substrait_rex(ctx, expr_substrait, input_schema,
extensions)
- .await?
- .as_ref()
- .clone();
+ let expr =
+ from_substrait_rex(ctx, expr_substrait, input_schema,
extensions).await?;
let Some(ArgType::Value(pattern_substrait)) = &f.arguments[1].arg_type
else {
return substrait_err!("Invalid arguments type for `{fn_name}`
expr");
};
let pattern =
- from_substrait_rex(ctx, pattern_substrait, input_schema,
extensions)
- .await?
- .as_ref()
- .clone();
+ from_substrait_rex(ctx, pattern_substrait, input_schema,
extensions).await?;
// Default case: escape character is Literal(Utf8(None))
let escape_char = if f.arguments.len() == 3 {
@@ -2093,9 +2067,7 @@ impl BuiltinExprBuilder {
let escape_char_expr =
from_substrait_rex(ctx, escape_char_substrait, input_schema,
extensions)
- .await?
- .as_ref()
- .clone();
+ .await?;
match escape_char_expr {
Expr::Literal(ScalarValue::Utf8(escape_char_string)) => {
@@ -2112,12 +2084,12 @@ impl BuiltinExprBuilder {
None
};
- Ok(Arc::new(Expr::Like(Like {
+ Ok(Expr::Like(Like {
negated: false,
expr: Box::new(expr),
pattern: Box::new(pattern),
escape_char,
case_insensitive,
- })))
+ }))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]