This is an automated email from the ASF dual-hosted git repository.
wayne pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new cf2de9b22c refactor: use ExprBuilder to consume substrait expr and use
macro to generate error (#8515)
cf2de9b22c is described below
commit cf2de9b22c8e7c4c4b80bdb82ae1353e36a5af51
Author: Ruihang Xia <[email protected]>
AuthorDate: Thu Dec 14 10:42:16 2023 +0800
refactor: use ExprBuilder to consume substrait expr and use macro to
generate error (#8515)
* refactor: use ExprBuilder to consume substrait expr
Signed-off-by: Ruihang Xia <[email protected]>
* use macro to generate error
Signed-off-by: Ruihang Xia <[email protected]>
---------
Signed-off-by: Ruihang Xia <[email protected]>
---
datafusion/common/src/error.rs | 3 +
datafusion/substrait/src/logical_plan/consumer.rs | 324 +++++++++++-----------
2 files changed, 158 insertions(+), 169 deletions(-)
diff --git a/datafusion/common/src/error.rs b/datafusion/common/src/error.rs
index 4ae30ae86c..56b52bd73f 100644
--- a/datafusion/common/src/error.rs
+++ b/datafusion/common/src/error.rs
@@ -517,6 +517,9 @@ make_error!(not_impl_err, not_impl_datafusion_err,
NotImplemented);
// Exposes a macro to create `DataFusionError::Execution`
make_error!(exec_err, exec_datafusion_err, Execution);
+// Exposes a macro to create `DataFusionError::Substrait`
+make_error!(substrait_err, substrait_datafusion_err, Substrait);
+
// Exposes a macro to create `DataFusionError::SQL`
#[macro_export]
macro_rules! sql_err {
diff --git a/datafusion/substrait/src/logical_plan/consumer.rs
b/datafusion/substrait/src/logical_plan/consumer.rs
index ffc9d094ab..f6b556fc64 100644
--- a/datafusion/substrait/src/logical_plan/consumer.rs
+++ b/datafusion/substrait/src/logical_plan/consumer.rs
@@ -17,7 +17,9 @@
use async_recursion::async_recursion;
use datafusion::arrow::datatypes::{DataType, Field, TimeUnit};
-use datafusion::common::{not_impl_err, DFField, DFSchema, DFSchemaRef};
+use datafusion::common::{
+ not_impl_err, substrait_datafusion_err, substrait_err, DFField, DFSchema,
DFSchemaRef,
+};
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::{
@@ -73,16 +75,7 @@ use crate::variation_const::{
enum ScalarFunctionType {
Builtin(BuiltinScalarFunction),
Op(Operator),
- /// [Expr::Not]
- Not,
- /// [Expr::Like] Used for filtering rows based on the given wildcard
pattern. Case sensitive
- Like,
- /// [Expr::Like] Case insensitive operator counterpart of `Like`
- ILike,
- /// [Expr::IsNull]
- IsNull,
- /// [Expr::IsNotNull]
- IsNotNull,
+ Expr(BuiltinExprBuilder),
}
pub fn name_to_op(name: &str) -> Result<Operator> {
@@ -127,14 +120,11 @@ fn scalar_function_type_from_str(name: &str) ->
Result<ScalarFunctionType> {
return Ok(ScalarFunctionType::Builtin(fun));
}
- match name {
- "not" => Ok(ScalarFunctionType::Not),
- "like" => Ok(ScalarFunctionType::Like),
- "ilike" => Ok(ScalarFunctionType::ILike),
- "is_null" => Ok(ScalarFunctionType::IsNull),
- "is_not_null" => Ok(ScalarFunctionType::IsNotNull),
- others => not_impl_err!("Unsupported function name: {others:?}"),
+ if let Some(builder) = BuiltinExprBuilder::try_from_name(name) {
+ return Ok(ScalarFunctionType::Expr(builder));
}
+
+ not_impl_err!("Unsupported function name: {name:?}")
}
fn split_eq_and_noneq_join_predicate_with_nulls_equality(
@@ -519,9 +509,7 @@ pub async fn from_substrait_rel(
},
Some(RelType::ExtensionLeaf(extension)) => {
let Some(ext_detail) = &extension.detail else {
- return Err(DataFusionError::Substrait(
- "Unexpected empty detail in ExtensionLeafRel".to_string(),
- ));
+ return substrait_err!("Unexpected empty detail in
ExtensionLeafRel");
};
let plan = ctx
.state()
@@ -531,18 +519,16 @@ pub async fn from_substrait_rel(
}
Some(RelType::ExtensionSingle(extension)) => {
let Some(ext_detail) = &extension.detail else {
- return Err(DataFusionError::Substrait(
- "Unexpected empty detail in
ExtensionSingleRel".to_string(),
- ));
+ return substrait_err!("Unexpected empty detail in
ExtensionSingleRel");
};
let plan = ctx
.state()
.serializer_registry()
.deserialize_logical_plan(&ext_detail.type_url,
&ext_detail.value)?;
let Some(input_rel) = &extension.input else {
- return Err(DataFusionError::Substrait(
- "ExtensionSingleRel doesn't contains input rel. Try use
ExtensionLeafRel instead".to_string()
- ));
+ return substrait_err!(
+ "ExtensionSingleRel doesn't contains input rel. Try use
ExtensionLeafRel instead"
+ );
};
let input_plan = from_substrait_rel(ctx, input_rel,
extensions).await?;
let plan = plan.from_template(&plan.expressions(), &[input_plan]);
@@ -550,9 +536,7 @@ pub async fn from_substrait_rel(
}
Some(RelType::ExtensionMulti(extension)) => {
let Some(ext_detail) = &extension.detail else {
- return Err(DataFusionError::Substrait(
- "Unexpected empty detail in
ExtensionSingleRel".to_string(),
- ));
+ return substrait_err!("Unexpected empty detail in
ExtensionSingleRel");
};
let plan = ctx
.state()
@@ -881,64 +865,8 @@ pub async fn from_substrait_rex(
),
}
}
- ScalarFunctionType::Not => {
- let arg = f.arguments.first().ok_or_else(|| {
- DataFusionError::Substrait(
- "expect one argument for `NOT` expr".to_string(),
- )
- })?;
- match &arg.arg_type {
- Some(ArgType::Value(e)) => {
- let expr = from_substrait_rex(e, input_schema,
extensions)
- .await?
- .as_ref()
- .clone();
- Ok(Arc::new(Expr::Not(Box::new(expr))))
- }
- _ => not_impl_err!("Invalid arguments for Not
expression"),
- }
- }
- ScalarFunctionType::Like => {
- make_datafusion_like(false, f, input_schema,
extensions).await
- }
- ScalarFunctionType::ILike => {
- make_datafusion_like(true, f, input_schema,
extensions).await
- }
- ScalarFunctionType::IsNull => {
- let arg = f.arguments.first().ok_or_else(|| {
- DataFusionError::Substrait(
- "expect one argument for `IS NULL`
expr".to_string(),
- )
- })?;
- match &arg.arg_type {
- Some(ArgType::Value(e)) => {
- let expr = from_substrait_rex(e, input_schema,
extensions)
- .await?
- .as_ref()
- .clone();
- Ok(Arc::new(Expr::IsNull(Box::new(expr))))
- }
- _ => not_impl_err!("Invalid arguments for IS NULL
expression"),
- }
- }
- ScalarFunctionType::IsNotNull => {
- let arg = f.arguments.first().ok_or_else(|| {
- DataFusionError::Substrait(
- "expect one argument for `IS NOT NULL`
expr".to_string(),
- )
- })?;
- match &arg.arg_type {
- Some(ArgType::Value(e)) => {
- let expr = from_substrait_rex(e, input_schema,
extensions)
- .await?
- .as_ref()
- .clone();
- Ok(Arc::new(Expr::IsNotNull(Box::new(expr))))
- }
- _ => {
- not_impl_err!("Invalid arguments for IS NOT NULL
expression")
- }
- }
+ ScalarFunctionType::Expr(builder) => {
+ builder.build(f, input_schema, extensions).await
}
}
}
@@ -960,9 +888,7 @@ pub async fn from_substrait_rex(
),
from_substrait_type(output_type)?,
)))),
- None => Err(DataFusionError::Substrait(
- "Cast experssion without output type is not
allowed".to_string(),
- )),
+ None => substrait_err!("Cast experssion without output type is not
allowed"),
},
Some(RexType::WindowFunction(window)) => {
let fun = match extensions.get(&window.function_reference) {
@@ -1087,9 +1013,7 @@ fn from_substrait_type(dt: &substrait::proto::Type) ->
Result<DataType> {
r#type::Kind::List(list) => {
let inner_type =
from_substrait_type(list.r#type.as_ref().ok_or_else(|| {
- DataFusionError::Substrait(
- "List type must have inner type".to_string(),
- )
+ substrait_datafusion_err!("List type must have inner
type")
})?)?;
let field = Arc::new(Field::new("list_item", inner_type,
true));
match list.type_variation_reference {
@@ -1141,9 +1065,7 @@ fn from_substrait_bound(
}
}
},
- None => Err(DataFusionError::Substrait(
- "WindowFunction missing Substrait Bound kind".to_string(),
- )),
+ None => substrait_err!("WindowFunction missing Substrait Bound
kind"),
},
None => {
if is_lower {
@@ -1162,36 +1084,28 @@ pub(crate) fn from_substrait_literal(lit: &Literal) ->
Result<ScalarValue> {
DEFAULT_TYPE_REF => ScalarValue::Int8(Some(*n as i8)),
UNSIGNED_INTEGER_TYPE_REF => ScalarValue::UInt8(Some(*n as u8)),
others => {
- return Err(DataFusionError::Substrait(format!(
- "Unknown type variation reference {others}",
- )));
+ return substrait_err!("Unknown type variation reference
{others}");
}
},
Some(LiteralType::I16(n)) => match lit.type_variation_reference {
DEFAULT_TYPE_REF => ScalarValue::Int16(Some(*n as i16)),
UNSIGNED_INTEGER_TYPE_REF => ScalarValue::UInt16(Some(*n as u16)),
others => {
- return Err(DataFusionError::Substrait(format!(
- "Unknown type variation reference {others}",
- )));
+ return substrait_err!("Unknown type variation reference
{others}");
}
},
Some(LiteralType::I32(n)) => match lit.type_variation_reference {
DEFAULT_TYPE_REF => ScalarValue::Int32(Some(*n)),
UNSIGNED_INTEGER_TYPE_REF => ScalarValue::UInt32(Some(*n as u32)),
others => {
- return Err(DataFusionError::Substrait(format!(
- "Unknown type variation reference {others}",
- )));
+ return substrait_err!("Unknown type variation reference
{others}");
}
},
Some(LiteralType::I64(n)) => match lit.type_variation_reference {
DEFAULT_TYPE_REF => ScalarValue::Int64(Some(*n)),
UNSIGNED_INTEGER_TYPE_REF => ScalarValue::UInt64(Some(*n as u64)),
others => {
- return Err(DataFusionError::Substrait(format!(
- "Unknown type variation reference {others}",
- )));
+ return substrait_err!("Unknown type variation reference
{others}");
}
},
Some(LiteralType::Fp32(f)) => ScalarValue::Float32(Some(*f)),
@@ -1202,9 +1116,7 @@ pub(crate) fn from_substrait_literal(lit: &Literal) ->
Result<ScalarValue> {
TIMESTAMP_MICRO_TYPE_REF =>
ScalarValue::TimestampMicrosecond(Some(*t), None),
TIMESTAMP_NANO_TYPE_REF =>
ScalarValue::TimestampNanosecond(Some(*t), None),
others => {
- return Err(DataFusionError::Substrait(format!(
- "Unknown type variation reference {others}",
- )));
+ return substrait_err!("Unknown type variation reference
{others}");
}
},
Some(LiteralType::Date(d)) => ScalarValue::Date32(Some(*d)),
@@ -1212,38 +1124,30 @@ pub(crate) fn from_substrait_literal(lit: &Literal) ->
Result<ScalarValue> {
DEFAULT_CONTAINER_TYPE_REF => ScalarValue::Utf8(Some(s.clone())),
LARGE_CONTAINER_TYPE_REF =>
ScalarValue::LargeUtf8(Some(s.clone())),
others => {
- return Err(DataFusionError::Substrait(format!(
- "Unknown type variation reference {others}",
- )));
+ return substrait_err!("Unknown type variation reference
{others}");
}
},
Some(LiteralType::Binary(b)) => match lit.type_variation_reference {
DEFAULT_CONTAINER_TYPE_REF => ScalarValue::Binary(Some(b.clone())),
LARGE_CONTAINER_TYPE_REF =>
ScalarValue::LargeBinary(Some(b.clone())),
others => {
- return Err(DataFusionError::Substrait(format!(
- "Unknown type variation reference {others}",
- )));
+ return substrait_err!("Unknown type variation reference
{others}");
}
},
Some(LiteralType::FixedBinary(b)) => {
ScalarValue::FixedSizeBinary(b.len() as _, Some(b.clone()))
}
Some(LiteralType::Decimal(d)) => {
- let value: [u8; 16] =
- d.value
- .clone()
- .try_into()
- .or(Err(DataFusionError::Substrait(
- "Failed to parse decimal value".to_string(),
- )))?;
+ let value: [u8; 16] = d
+ .value
+ .clone()
+ .try_into()
+ .or(substrait_err!("Failed to parse decimal value"))?;
let p = d.precision.try_into().map_err(|e| {
- DataFusionError::Substrait(format!(
- "Failed to parse decimal precision: {e}"
- ))
+ substrait_datafusion_err!("Failed to parse decimal precision:
{e}")
})?;
let s = d.scale.try_into().map_err(|e| {
- DataFusionError::Substrait(format!("Failed to parse decimal
scale: {e}"))
+ substrait_datafusion_err!("Failed to parse decimal scale: {e}")
})?;
ScalarValue::Decimal128(
Some(std::primitive::i128::from_le_bytes(value)),
@@ -1341,50 +1245,132 @@ fn from_substrait_null(null_type: &Type) ->
Result<ScalarValue> {
}
}
-async fn make_datafusion_like(
- case_insensitive: bool,
- f: &ScalarFunction,
- input_schema: &DFSchema,
- extensions: &HashMap<u32, &String>,
-) -> Result<Arc<Expr>> {
- let fn_name = if case_insensitive { "ILIKE" } else { "LIKE" };
- if f.arguments.len() != 3 {
- return not_impl_err!("Expect three arguments for `{fn_name}` expr");
+/// Build [`Expr`] from its name and required inputs.
+struct BuiltinExprBuilder {
+ expr_name: String,
+}
+
+impl BuiltinExprBuilder {
+ pub fn try_from_name(name: &str) -> Option<Self> {
+ match name {
+ "not" | "like" | "ilike" | "is_null" | "is_not_null" => Some(Self {
+ expr_name: name.to_string(),
+ }),
+ _ => None,
+ }
}
- let Some(ArgType::Value(expr_substrait)) = &f.arguments[0].arg_type else {
- return not_impl_err!("Invalid arguments type for `{fn_name}` expr");
- };
- let expr = from_substrait_rex(expr_substrait, input_schema, extensions)
- .await?
- .as_ref()
- .clone();
- let Some(ArgType::Value(pattern_substrait)) = &f.arguments[1].arg_type
else {
- return not_impl_err!("Invalid arguments type for `{fn_name}` expr");
- };
- let pattern = from_substrait_rex(pattern_substrait, input_schema,
extensions)
- .await?
- .as_ref()
- .clone();
- let Some(ArgType::Value(escape_char_substrait)) = &f.arguments[2].arg_type
else {
- return not_impl_err!("Invalid arguments type for `{fn_name}` expr");
- };
- let escape_char_expr =
- from_substrait_rex(escape_char_substrait, input_schema, extensions)
+ pub async fn build(
+ self,
+ f: &ScalarFunction,
+ input_schema: &DFSchema,
+ extensions: &HashMap<u32, &String>,
+ ) -> Result<Arc<Expr>> {
+ match self.expr_name.as_str() {
+ "not" => Self::build_not_expr(f, input_schema, extensions).await,
+ "like" => Self::build_like_expr(false, f, input_schema,
extensions).await,
+ "ilike" => Self::build_like_expr(true, f, input_schema,
extensions).await,
+ "is_null" => {
+ Self::build_is_null_expr(false, f, input_schema,
extensions).await
+ }
+ "is_not_null" => {
+ Self::build_is_null_expr(true, f, input_schema,
extensions).await
+ }
+ _ => {
+ not_impl_err!("Unsupported builtin expression: {}",
self.expr_name)
+ }
+ }
+ }
+
+ async fn build_not_expr(
+ f: &ScalarFunction,
+ input_schema: &DFSchema,
+ extensions: &HashMap<u32, &String>,
+ ) -> Result<Arc<Expr>> {
+ if f.arguments.len() != 1 {
+ return not_impl_err!("Expect one argument for `NOT` expr");
+ }
+ let Some(ArgType::Value(expr_substrait)) = &f.arguments[0].arg_type
else {
+ return not_impl_err!("Invalid arguments type for `NOT` expr");
+ };
+ let expr = from_substrait_rex(expr_substrait, input_schema, extensions)
.await?
.as_ref()
.clone();
- let Expr::Literal(ScalarValue::Utf8(escape_char)) = escape_char_expr else {
- return Err(DataFusionError::Substrait(format!(
- "Expect Utf8 literal for escape char, but found
{escape_char_expr:?}",
- )));
- };
+ Ok(Arc::new(Expr::Not(Box::new(expr))))
+ }
+
+ async fn build_like_expr(
+ case_insensitive: bool,
+ f: &ScalarFunction,
+ input_schema: &DFSchema,
+ extensions: &HashMap<u32, &String>,
+ ) -> Result<Arc<Expr>> {
+ let fn_name = if case_insensitive { "ILIKE" } else { "LIKE" };
+ if f.arguments.len() != 3 {
+ return not_impl_err!("Expect three arguments for `{fn_name}`
expr");
+ }
+
+ let Some(ArgType::Value(expr_substrait)) = &f.arguments[0].arg_type
else {
+ return not_impl_err!("Invalid arguments type for `{fn_name}`
expr");
+ };
+ let expr = from_substrait_rex(expr_substrait, input_schema, extensions)
+ .await?
+ .as_ref()
+ .clone();
+ let Some(ArgType::Value(pattern_substrait)) = &f.arguments[1].arg_type
else {
+ return not_impl_err!("Invalid arguments type for `{fn_name}`
expr");
+ };
+ let pattern = from_substrait_rex(pattern_substrait, input_schema,
extensions)
+ .await?
+ .as_ref()
+ .clone();
+ let Some(ArgType::Value(escape_char_substrait)) =
&f.arguments[2].arg_type else {
+ return not_impl_err!("Invalid arguments type for `{fn_name}`
expr");
+ };
+ let escape_char_expr =
+ from_substrait_rex(escape_char_substrait, input_schema, extensions)
+ .await?
+ .as_ref()
+ .clone();
+ let Expr::Literal(ScalarValue::Utf8(escape_char)) = escape_char_expr
else {
+ return substrait_err!(
+ "Expect Utf8 literal for escape char, but found
{escape_char_expr:?}"
+ );
+ };
- Ok(Arc::new(Expr::Like(Like {
- negated: false,
- expr: Box::new(expr),
- pattern: Box::new(pattern),
- escape_char: escape_char.map(|c| c.chars().next().unwrap()),
- case_insensitive,
- })))
+ Ok(Arc::new(Expr::Like(Like {
+ negated: false,
+ expr: Box::new(expr),
+ pattern: Box::new(pattern),
+ escape_char: escape_char.map(|c| c.chars().next().unwrap()),
+ case_insensitive,
+ })))
+ }
+
+ async fn build_is_null_expr(
+ is_not: bool,
+ f: &ScalarFunction,
+ input_schema: &DFSchema,
+ extensions: &HashMap<u32, &String>,
+ ) -> Result<Arc<Expr>> {
+ let fn_name = if is_not { "IS NOT NULL" } else { "IS NULL" };
+ let arg = f.arguments.first().ok_or_else(|| {
+ substrait_datafusion_err!("expect one argument for `{fn_name}`
expr")
+ })?;
+ match &arg.arg_type {
+ Some(ArgType::Value(e)) => {
+ let expr = from_substrait_rex(e, input_schema, extensions)
+ .await?
+ .as_ref()
+ .clone();
+ if is_not {
+ Ok(Arc::new(Expr::IsNotNull(Box::new(expr))))
+ } else {
+ Ok(Arc::new(Expr::IsNull(Box::new(expr))))
+ }
+ }
+ _ => substrait_err!("Invalid arguments for `{fn_name}`
expression"),
+ }
+ }
}