yyy1000 commented on code in PR #9436:
URL: https://github.com/apache/arrow-datafusion/pull/9436#discussion_r1510296803
##########
datafusion/proto/src/physical_plan/to_proto.rs:
##########
@@ -374,228 +378,234 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) ->
Result<AggrFn> {
Ok(AggrFn { inner, distinct })
}
-impl TryFrom<Arc<dyn PhysicalExpr>> for protobuf::PhysicalExprNode {
- type Error = DataFusionError;
+fn serialize_expr(
+ value: Arc<dyn PhysicalExpr>,
+ codec: &dyn PhysicalExtensionCodec,
+) -> Result<protobuf::PhysicalExprNode, DataFusionError> {
+ let expr = value.as_any();
- fn try_from(value: Arc<dyn PhysicalExpr>) -> Result<Self, Self::Error> {
- let expr = value.as_any();
-
- if let Some(expr) = expr.downcast_ref::<Column>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
- protobuf::PhysicalColumn {
- name: expr.name().to_string(),
- index: expr.index() as u32,
- },
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
- let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
- l: Some(Box::new(expr.left().to_owned().try_into()?)),
- r: Some(Box::new(expr.right().to_owned().try_into()?)),
- op: format!("{:?}", expr.op()),
- });
+ if let Some(expr) = expr.downcast_ref::<Column>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
+ protobuf::PhysicalColumn {
+ name: expr.name().to_string(),
+ index: expr.index() as u32,
+ },
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
+ let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
+ l: Some(Box::new(serialize_expr(expr.left().clone(), codec)?)),
+ r: Some(Box::new(serialize_expr(expr.right().clone(), codec)?)),
+ op: format!("{:?}", expr.op()),
+ });
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
- binary_expr,
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(
- protobuf::physical_expr_node::ExprType::Case(
- Box::new(
- protobuf::PhysicalCaseNode {
- expr: expr
- .expr()
- .map(|exp|
exp.clone().try_into().map(Box::new))
- .transpose()?,
- when_then_expr: expr
- .when_then_expr()
- .iter()
- .map(|(when_expr, then_expr)| {
- try_parse_when_then_expr(when_expr,
then_expr)
- })
- .collect::<Result<
- Vec<protobuf::PhysicalWhenThen>,
- Self::Error,
- >>()?,
- else_expr: expr
- .else_expr()
- .map(|a|
a.clone().try_into().map(Box::new))
- .transpose()?,
- },
- ),
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
+ binary_expr,
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(
+ protobuf::physical_expr_node::ExprType::Case(
+ Box::new(
+ protobuf::PhysicalCaseNode {
+ expr: expr
+ .expr()
+ .map(|exp| {
+ serialize_expr(exp.clone(),
codec).map(Box::new)
+ })
+ .transpose()?,
+ when_then_expr: expr
+ .when_then_expr()
+ .iter()
+ .map(|(when_expr, then_expr)| {
+ try_parse_when_then_expr(when_expr,
then_expr, codec)
+ })
+ .collect::<Result<
+ Vec<protobuf::PhysicalWhenThen>,
+ DataFusionError,
+ >>()?,
+ else_expr: expr
+ .else_expr()
+ .map(|a| serialize_expr(a.clone(),
codec).map(Box::new))
+ .transpose()?,
+ },
),
),
- })
- } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::NotExpr(
- Box::new(protobuf::PhysicalNot {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
- Box::new(protobuf::PhysicalIsNull {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
- Box::new(protobuf::PhysicalIsNotNull {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(
- protobuf::physical_expr_node::ExprType::InList(
- Box::new(
- protobuf::PhysicalInListNode {
- expr:
Some(Box::new(expr.expr().to_owned().try_into()?)),
- list: expr
- .list()
- .iter()
- .map(|a| a.clone().try_into())
- .collect::<Result<
+ ),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
+ protobuf::PhysicalNot {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ },
+ ))),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
+ Box::new(protobuf::PhysicalIsNull {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ }),
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
+ Box::new(protobuf::PhysicalIsNotNull {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ }),
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(
+ protobuf::physical_expr_node::ExprType::InList(
+ Box::new(
+ protobuf::PhysicalInListNode {
+ expr: Some(Box::new(serialize_expr(
+ expr.expr().to_owned(),
+ codec,
+ )?)),
+ list: expr
+ .list()
+ .iter()
+ .map(|a| serialize_expr(a.clone(), codec))
+ .collect::<Result<
Vec<protobuf::PhysicalExprNode>,
- Self::Error,
+ DataFusionError,
>>()?,
- negated: expr.negated(),
- },
- ),
+ negated: expr.negated(),
+ },
),
),
- })
- } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::Negative(
- Box::new(protobuf::PhysicalNegativeNode {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(lit) = expr.downcast_ref::<Literal>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::Literal(
- lit.value().try_into()?,
- )),
- })
- } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
+ ),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
+ protobuf::PhysicalNegativeNode {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ },
+ ))),
+ })
+ } else if let Some(lit) = expr.downcast_ref::<Literal>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
+ lit.value().try_into()?,
+ )),
+ })
+ } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
+ protobuf::PhysicalCastNode {
+ expr: Some(Box::new(serialize_expr(cast.expr().to_owned(),
codec)?)),
+ arrow_type: Some(cast.cast_type().try_into()?),
+ },
+ ))),
+ })
+ } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
+ protobuf::PhysicalTryCastNode {
+ expr: Some(Box::new(serialize_expr(cast.expr().to_owned(),
codec)?)),
+ arrow_type: Some(cast.cast_type().try_into()?),
+ },
+ ))),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
+ let args: Vec<protobuf::PhysicalExprNode> = expr
+ .args()
+ .iter()
+ .map(|e| serialize_expr(e.to_owned(), codec))
+ .collect::<Result<Vec<_>, _>>()?;
+ if let Ok(fun) = BuiltinScalarFunction::from_str(expr.name()) {
+ let fun: protobuf::ScalarFunction = (&fun).try_into()?;
+
Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
- protobuf::PhysicalCastNode {
- expr: Some(Box::new(cast.expr().clone().try_into()?)),
- arrow_type: Some(cast.cast_type().try_into()?),
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::ScalarFunction(
+ protobuf::PhysicalScalarFunctionNode {
+ name: expr.name().to_string(),
+ fun: fun.into(),
+ args,
+ return_type: Some(expr.return_type().try_into()?),
},
- ))),
- })
- } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::TryCast(
- Box::new(protobuf::PhysicalTryCastNode {
- expr: Some(Box::new(cast.expr().clone().try_into()?)),
- arrow_type: Some(cast.cast_type().try_into()?),
- }),
)),
})
- } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
- let args: Vec<protobuf::PhysicalExprNode> = expr
- .args()
- .iter()
- .map(|e| e.to_owned().try_into())
- .collect::<Result<Vec<_>, _>>()?;
- if let Ok(fun) = BuiltinScalarFunction::from_str(expr.name()) {
- let fun: protobuf::ScalarFunction = (&fun).try_into()?;
-
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(
- protobuf::physical_expr_node::ExprType::ScalarFunction(
- protobuf::PhysicalScalarFunctionNode {
- name: expr.name().to_string(),
- fun: fun.into(),
- args,
- return_type:
Some(expr.return_type().try_into()?),
- },
- ),
- ),
- })
- } else {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
- protobuf::PhysicalScalarUdfNode {
- name: expr.name().to_string(),
- args,
- return_type: Some(expr.return_type().try_into()?),
- },
- )),
- })
- }
- } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
+ } else {
+ let mut buf = Vec::new();
+ // let _ = codec.try_encode_udf(, &mut buf);
Review Comment:
Thanks for your review! @thinkharderdev
Yeah, the change is what I thought. And I also think some other fields in
`ScalarFunctionExpr` can be removed like `name` and `return_type`.
Yeah, I think create a ScalarFunctionImplementation from
ScalarFunctionDefinition is a method. Given the `BuiltinScalarFunction` will be
removed and we can get the `inner` of a ScalarUDF which is a `ScalarUDFImpl`,
and we can do something like.
```
let captured = inner.clone();
Arc::new(move |args| captured.invoke(args))
```
I'd like to see the opinions from you and @alamb :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]