This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 60c7bd22d4 Handle serialization of TryCast (#5692)
60c7bd22d4 is described below
commit 60c7bd22d4e0c368c98a12ef6215f2ddc6c60ec2
Author: Dan Harris <[email protected]>
AuthorDate: Thu Mar 23 16:23:24 2023 -0400
Handle serialization of TryCast (#5692)
* Handle serialization of TryCast
* formatting
* add case for invalid expression
---
datafusion/proto/src/logical_plan/mod.rs | 17 ++-
datafusion/proto/src/logical_plan/to_proto.rs | 144 ++++++++++++++++----------
2 files changed, 105 insertions(+), 56 deletions(-)
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index 7a8233f701..cd0f2a66b8 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -1407,7 +1407,7 @@ mod roundtrip_tests {
use datafusion_expr::{
col, lit, Accumulator, AggregateFunction,
BuiltinScalarFunction::{Sqrt, Substr},
- Expr, LogicalPlan, Operator, Volatility,
+ Expr, LogicalPlan, Operator, TryCast, Volatility,
};
use datafusion_expr::{
create_udaf, WindowFrame, WindowFrameBound, WindowFrameUnits,
WindowFunction,
@@ -2386,6 +2386,21 @@ mod roundtrip_tests {
roundtrip_expr_test(test_expr, ctx);
}
+ #[test]
+ fn roundtrip_try_cast() {
+ let test_expr =
+ Expr::TryCast(TryCast::new(Box::new(lit(1.0_f32)),
DataType::Boolean));
+
+ let ctx = SessionContext::new();
+ roundtrip_expr_test(test_expr, ctx);
+
+ let test_expr =
+ Expr::TryCast(TryCast::new(Box::new(lit("not a bool")),
DataType::Boolean));
+
+ let ctx = SessionContext::new();
+ roundtrip_expr_test(test_expr, ctx);
+ }
+
#[test]
fn roundtrip_sort_expr() {
let test_expr = Expr::Sort(Sort::new(Box::new(lit(1.0_f32)), true,
true));
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index da16cd9e02..e8570cf3c7 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -40,7 +40,7 @@ use datafusion_expr::expr::{
use datafusion_expr::{
logical_plan::PlanType, logical_plan::StringifiedPlan, AggregateFunction,
BuiltInWindowFunction, BuiltinScalarFunction, Expr, JoinConstraint,
JoinType,
- WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunction,
+ TryCast, WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunction,
};
#[derive(Debug)]
@@ -489,50 +489,59 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
// linearized from left innermost to right outermost (but
while
// traversing the chain we do the exact opposite).
operands: exprs
- .into_iter()
- .rev()
- .map(|expr| expr.try_into())
- .collect::<Result<Vec<_>, Error>>()?,
+ .into_iter()
+ .rev()
+ .map(|expr| expr.try_into())
+ .collect::<Result<Vec<_>, Error>>()?,
op: format!("{op:?}"),
};
Self {
expr_type: Some(ExprType::BinaryExpr(binary_expr)),
}
}
- Expr::Like(Like { negated, expr, pattern, escape_char }) => {
+ Expr::Like(Like {
+ negated,
+ expr,
+ pattern,
+ escape_char,
+ }) => {
let pb = Box::new(protobuf::LikeNode {
negated: *negated,
expr: Some(Box::new(expr.as_ref().try_into()?)),
pattern: Some(Box::new(pattern.as_ref().try_into()?)),
- escape_char: escape_char
- .map(|ch| ch.to_string())
- .unwrap_or_default()
+ escape_char: escape_char.map(|ch|
ch.to_string()).unwrap_or_default(),
});
Self {
expr_type: Some(ExprType::Like(pb)),
}
}
- Expr::ILike(Like { negated, expr, pattern, escape_char }) => {
+ Expr::ILike(Like {
+ negated,
+ expr,
+ pattern,
+ escape_char,
+ }) => {
let pb = Box::new(protobuf::ILikeNode {
negated: *negated,
expr: Some(Box::new(expr.as_ref().try_into()?)),
pattern: Some(Box::new(pattern.as_ref().try_into()?)),
- escape_char: escape_char
- .map(|ch| ch.to_string())
- .unwrap_or_default(),
+ escape_char: escape_char.map(|ch|
ch.to_string()).unwrap_or_default(),
});
Self {
expr_type: Some(ExprType::Ilike(pb)),
}
}
- Expr::SimilarTo(Like { negated, expr, pattern, escape_char }) => {
+ Expr::SimilarTo(Like {
+ negated,
+ expr,
+ pattern,
+ escape_char,
+ }) => {
let pb = Box::new(protobuf::SimilarToNode {
negated: *negated,
expr: Some(Box::new(expr.as_ref().try_into()?)),
pattern: Some(Box::new(pattern.as_ref().try_into()?)),
- escape_char: escape_char
- .map(|ch| ch.to_string())
- .unwrap_or_default(),
+ escape_char: escape_char.map(|ch|
ch.to_string()).unwrap_or_default(),
});
Self {
expr_type: Some(ExprType::SimilarTo(pb)),
@@ -557,7 +566,11 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
)
}
// TODO: Tracked in
https://github.com/apache/arrow-datafusion/issues/4584
- WindowFunction::AggregateUDF(_) => return
Err(Error::NotImplemented("UDAF as window function in proto".to_string()))
+ WindowFunction::AggregateUDF(_) => {
+ return Err(Error::NotImplemented(
+ "UDAF as window function in proto".to_string(),
+ ))
+ }
};
let arg_expr: Option<Box<Self>> = if !args.is_empty() {
let arg = &args[0];
@@ -574,7 +587,8 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
.map(|e| e.try_into())
.collect::<Result<Vec<_>, _>>()?;
- let window_frame: Option<protobuf::WindowFrame> =
Some(window_frame.try_into()?);
+ let window_frame: Option<protobuf::WindowFrame> =
+ Some(window_frame.try_into()?);
let window_expr = Box::new(protobuf::WindowExprNode {
expr: arg_expr,
window_function: Some(window_function),
@@ -590,7 +604,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
ref fun,
ref args,
ref distinct,
- ref filter
+ ref filter,
}) => {
let aggr_function = match fun {
AggregateFunction::ApproxDistinct => {
@@ -648,7 +662,12 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
expr_type:
Some(ExprType::AggregateExpr(Box::new(aggregate_expr))),
}
}
- Expr::ScalarVariable(_, _) => return Err(Error::General("Proto
serialization error: Scalar Variable not supported".to_string())),
+ Expr::ScalarVariable(_, _) => {
+ return Err(Error::General(
+ "Proto serialization error: Scalar Variable not supported"
+ .to_string(),
+ ))
+ }
Expr::ScalarFunction { ref fun, ref args } => {
let fun: protobuf::ScalarFunction = fun.try_into()?;
let args: Vec<Self> = args
@@ -673,23 +692,22 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
.collect::<Result<Vec<_>, Error>>()?,
})),
},
- Expr::AggregateUDF { fun, args, filter } => {
- Self {
- expr_type: Some(ExprType::AggregateUdfExpr(
- Box::new(protobuf::AggregateUdfExprNode {
- fun_name: fun.name.clone(),
- args: args.iter().map(|expr|
expr.try_into()).collect::<Result<
- Vec<_>,
- Error,
- >>()?,
- filter: match filter {
- Some(e) =>
Some(Box::new(e.as_ref().try_into()?)),
- None => None,
- },
+ Expr::AggregateUDF { fun, args, filter } => Self {
+ expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
+ protobuf::AggregateUdfExprNode {
+ fun_name: fun.name.clone(),
+ args: args.iter().map(|expr|
expr.try_into()).collect::<Result<
+ Vec<_>,
+ Error,
+ >>(
+ )?,
+ filter: match filter {
+ Some(e) => Some(Box::new(e.as_ref().try_into()?)),
+ None => None,
},
- ))),
- }
- }
+ },
+ ))),
+ },
Expr::Not(expr) => {
let expr = Box::new(protobuf::Not {
expr: Some(Box::new(expr.as_ref().try_into()?)),
@@ -779,7 +797,8 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
}
}
Expr::Case(case) => {
- let when_then_expr = case.when_then_expr
+ let when_then_expr = case
+ .when_then_expr
.iter()
.map(|(w, t)| {
Ok(protobuf::WhenThen {
@@ -812,7 +831,16 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
expr_type: Some(ExprType::Cast(expr)),
}
}
- Expr::Sort(Sort{
+ Expr::TryCast(TryCast { expr, data_type }) => {
+ let expr = Box::new(protobuf::TryCastNode {
+ expr: Some(Box::new(expr.as_ref().try_into()?)),
+ arrow_type: Some(data_type.try_into()?),
+ });
+ Self {
+ expr_type: Some(ExprType::TryCast(expr)),
+ }
+ }
+ Expr::Sort(Sort {
expr,
asc,
nulls_first,
@@ -854,27 +882,30 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
Expr::Wildcard => Self {
expr_type: Some(ExprType::Wildcard(true)),
},
- Expr::ScalarSubquery(_) | Expr::InSubquery { .. } | Expr::Exists {
.. } | Expr::OuterReferenceColumn{..} => {
+ Expr::ScalarSubquery(_)
+ | Expr::InSubquery { .. }
+ | Expr::Exists { .. }
+ | Expr::OuterReferenceColumn { .. } => {
// we would need to add logical plan operators to
datafusion.proto to support this
// see discussion in
https://github.com/apache/arrow-datafusion/issues/2565
return Err(Error::General("Proto serialization error:
Expr::ScalarSubquery(_) | Expr::InSubquery { .. } | Expr::Exists { .. } |
Exp:OuterReferenceColumn not supported".to_string()));
}
- Expr::GetIndexedField(GetIndexedField { key, expr }) =>
- Self {
- expr_type: Some(ExprType::GetIndexedField(Box::new(
- protobuf::GetIndexedField {
- key: Some(key.try_into()?),
- expr: Some(Box::new(expr.as_ref().try_into()?)),
- },
- ))),
- },
+ Expr::GetIndexedField(GetIndexedField { key, expr }) => Self {
+ expr_type: Some(ExprType::GetIndexedField(Box::new(
+ protobuf::GetIndexedField {
+ key: Some(key.try_into()?),
+ expr: Some(Box::new(expr.as_ref().try_into()?)),
+ },
+ ))),
+ },
Expr::GroupingSet(GroupingSet::Cube(exprs)) => Self {
expr_type: Some(ExprType::Cube(CubeNode {
expr: exprs.iter().map(|expr|
expr.try_into()).collect::<Result<
Vec<_>,
Self::Error,
- >>()?,
+ >>(
+ )?,
})),
},
Expr::GroupingSet(GroupingSet::Rollup(exprs)) => Self {
@@ -882,7 +913,8 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
expr: exprs.iter().map(|expr|
expr.try_into()).collect::<Result<
Vec<_>,
Self::Error,
- >>()?,
+ >>(
+ )?,
})),
},
Expr::GroupingSet(GroupingSet::GroupingSets(exprs)) => Self {
@@ -900,7 +932,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
.collect::<Result<Vec<_>, Self::Error>>()?,
})),
},
- Expr::Placeholder{ id, data_type } => {
+ Expr::Placeholder { id, data_type } => {
let data_type = match data_type {
Some(data_type) => Some(data_type.try_into()?),
None => None,
@@ -911,10 +943,12 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
data_type,
})),
}
- },
+ }
- Expr::QualifiedWildcard { .. } | Expr::TryCast { .. } =>
- return Err(Error::General("Proto serialization error:
Expr::QualifiedWildcard { .. } | Expr::TryCast { .. } not
supported".to_string())),
+ Expr::QualifiedWildcard { .. } => return Err(Error::General(
+ "Proto serialization error: Expr::QualifiedWildcard { .. } not
supported"
+ .to_string(),
+ )),
};
Ok(expr_node)