yyy1000 commented on code in PR #9436:
URL: https://github.com/apache/arrow-datafusion/pull/9436#discussion_r1510159310
##########
datafusion/physical-expr/src/scalar_function.rs:
##########
@@ -44,12 +44,12 @@ use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_expr::{
expr_vec_fmt, BuiltinScalarFunction, ColumnarValue, FuncMonotonicity,
- ScalarFunctionImplementation,
+ ScalarFunctionDefinition, ScalarFunctionImplementation,
};
/// Physical expression of a scalar function
pub struct ScalarFunctionExpr {
- fun: ScalarFunctionImplementation,
+ fun: ScalarFunctionDefinition,
Review Comment:
I think the `fun` here can be a `ScalarFunctionDefinition` to get the
`ScalarUDF` from `ScalarFunctionExpr`?
Also I searched the code base and found ScalarFunctionImplementation is only
used for:
1. BuiltinScalarFunction
2. method like `create_udf` which I think will be deprecated in the future
given the new `ScalarUDFImpl` way.
So I think `ScalarFunctionImplementation` will also be deleted in a day. 🤔
WDYT?
##########
datafusion/proto/src/physical_plan/to_proto.rs:
##########
@@ -374,228 +378,234 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) ->
Result<AggrFn> {
Ok(AggrFn { inner, distinct })
}
-impl TryFrom<Arc<dyn PhysicalExpr>> for protobuf::PhysicalExprNode {
- type Error = DataFusionError;
+fn serialize_expr(
+ value: Arc<dyn PhysicalExpr>,
+ codec: &dyn PhysicalExtensionCodec,
+) -> Result<protobuf::PhysicalExprNode, DataFusionError> {
+ let expr = value.as_any();
- fn try_from(value: Arc<dyn PhysicalExpr>) -> Result<Self, Self::Error> {
- let expr = value.as_any();
-
- if let Some(expr) = expr.downcast_ref::<Column>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
- protobuf::PhysicalColumn {
- name: expr.name().to_string(),
- index: expr.index() as u32,
- },
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
- let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
- l: Some(Box::new(expr.left().to_owned().try_into()?)),
- r: Some(Box::new(expr.right().to_owned().try_into()?)),
- op: format!("{:?}", expr.op()),
- });
+ if let Some(expr) = expr.downcast_ref::<Column>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
+ protobuf::PhysicalColumn {
+ name: expr.name().to_string(),
+ index: expr.index() as u32,
+ },
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
+ let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
+ l: Some(Box::new(serialize_expr(expr.left().clone(), codec)?)),
+ r: Some(Box::new(serialize_expr(expr.right().clone(), codec)?)),
+ op: format!("{:?}", expr.op()),
+ });
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
- binary_expr,
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(
- protobuf::physical_expr_node::ExprType::Case(
- Box::new(
- protobuf::PhysicalCaseNode {
- expr: expr
- .expr()
- .map(|exp|
exp.clone().try_into().map(Box::new))
- .transpose()?,
- when_then_expr: expr
- .when_then_expr()
- .iter()
- .map(|(when_expr, then_expr)| {
- try_parse_when_then_expr(when_expr,
then_expr)
- })
- .collect::<Result<
- Vec<protobuf::PhysicalWhenThen>,
- Self::Error,
- >>()?,
- else_expr: expr
- .else_expr()
- .map(|a|
a.clone().try_into().map(Box::new))
- .transpose()?,
- },
- ),
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
+ binary_expr,
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(
+ protobuf::physical_expr_node::ExprType::Case(
+ Box::new(
+ protobuf::PhysicalCaseNode {
+ expr: expr
+ .expr()
+ .map(|exp| {
+ serialize_expr(exp.clone(),
codec).map(Box::new)
+ })
+ .transpose()?,
+ when_then_expr: expr
+ .when_then_expr()
+ .iter()
+ .map(|(when_expr, then_expr)| {
+ try_parse_when_then_expr(when_expr,
then_expr, codec)
+ })
+ .collect::<Result<
+ Vec<protobuf::PhysicalWhenThen>,
+ DataFusionError,
+ >>()?,
+ else_expr: expr
+ .else_expr()
+ .map(|a| serialize_expr(a.clone(),
codec).map(Box::new))
+ .transpose()?,
+ },
),
),
- })
- } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::NotExpr(
- Box::new(protobuf::PhysicalNot {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
- Box::new(protobuf::PhysicalIsNull {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
- Box::new(protobuf::PhysicalIsNotNull {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(
- protobuf::physical_expr_node::ExprType::InList(
- Box::new(
- protobuf::PhysicalInListNode {
- expr:
Some(Box::new(expr.expr().to_owned().try_into()?)),
- list: expr
- .list()
- .iter()
- .map(|a| a.clone().try_into())
- .collect::<Result<
+ ),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
+ protobuf::PhysicalNot {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ },
+ ))),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
+ Box::new(protobuf::PhysicalIsNull {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ }),
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
+ Box::new(protobuf::PhysicalIsNotNull {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ }),
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(
+ protobuf::physical_expr_node::ExprType::InList(
+ Box::new(
+ protobuf::PhysicalInListNode {
+ expr: Some(Box::new(serialize_expr(
+ expr.expr().to_owned(),
+ codec,
+ )?)),
+ list: expr
+ .list()
+ .iter()
+ .map(|a| serialize_expr(a.clone(), codec))
+ .collect::<Result<
Vec<protobuf::PhysicalExprNode>,
- Self::Error,
+ DataFusionError,
>>()?,
- negated: expr.negated(),
- },
- ),
+ negated: expr.negated(),
+ },
),
),
- })
- } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::Negative(
- Box::new(protobuf::PhysicalNegativeNode {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(lit) = expr.downcast_ref::<Literal>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::Literal(
- lit.value().try_into()?,
- )),
- })
- } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
+ ),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
+ protobuf::PhysicalNegativeNode {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ },
+ ))),
+ })
+ } else if let Some(lit) = expr.downcast_ref::<Literal>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
+ lit.value().try_into()?,
+ )),
+ })
+ } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
+ protobuf::PhysicalCastNode {
+ expr: Some(Box::new(serialize_expr(cast.expr().to_owned(),
codec)?)),
+ arrow_type: Some(cast.cast_type().try_into()?),
+ },
+ ))),
+ })
+ } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
+ protobuf::PhysicalTryCastNode {
+ expr: Some(Box::new(serialize_expr(cast.expr().to_owned(),
codec)?)),
+ arrow_type: Some(cast.cast_type().try_into()?),
+ },
+ ))),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
+ let args: Vec<protobuf::PhysicalExprNode> = expr
+ .args()
+ .iter()
+ .map(|e| serialize_expr(e.to_owned(), codec))
+ .collect::<Result<Vec<_>, _>>()?;
+ if let Ok(fun) = BuiltinScalarFunction::from_str(expr.name()) {
+ let fun: protobuf::ScalarFunction = (&fun).try_into()?;
+
Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
- protobuf::PhysicalCastNode {
- expr: Some(Box::new(cast.expr().clone().try_into()?)),
- arrow_type: Some(cast.cast_type().try_into()?),
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::ScalarFunction(
+ protobuf::PhysicalScalarFunctionNode {
+ name: expr.name().to_string(),
+ fun: fun.into(),
+ args,
+ return_type: Some(expr.return_type().try_into()?),
},
- ))),
- })
- } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::TryCast(
- Box::new(protobuf::PhysicalTryCastNode {
- expr: Some(Box::new(cast.expr().clone().try_into()?)),
- arrow_type: Some(cast.cast_type().try_into()?),
- }),
)),
})
- } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
- let args: Vec<protobuf::PhysicalExprNode> = expr
- .args()
- .iter()
- .map(|e| e.to_owned().try_into())
- .collect::<Result<Vec<_>, _>>()?;
- if let Ok(fun) = BuiltinScalarFunction::from_str(expr.name()) {
- let fun: protobuf::ScalarFunction = (&fun).try_into()?;
-
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(
- protobuf::physical_expr_node::ExprType::ScalarFunction(
- protobuf::PhysicalScalarFunctionNode {
- name: expr.name().to_string(),
- fun: fun.into(),
- args,
- return_type:
Some(expr.return_type().try_into()?),
- },
- ),
- ),
- })
- } else {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
- protobuf::PhysicalScalarUdfNode {
- name: expr.name().to_string(),
- args,
- return_type: Some(expr.return_type().try_into()?),
- },
- )),
- })
- }
- } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
+ } else {
+ let mut buf = Vec::new();
+ // let _ = codec.try_encode_udf(, &mut buf);
Review Comment:
I have a question, how can I get a `ScalarUDF` from a `ScalarFunctionExpr`?
Given its current structure, I would think change the fields in
`ScalarFunctionExpr` may be a method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]