thinkharderdev commented on code in PR #9436:
URL: https://github.com/apache/arrow-datafusion/pull/9436#discussion_r1510246296
##########
datafusion/proto/src/physical_plan/to_proto.rs:
##########
@@ -374,228 +378,234 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) ->
Result<AggrFn> {
Ok(AggrFn { inner, distinct })
}
-impl TryFrom<Arc<dyn PhysicalExpr>> for protobuf::PhysicalExprNode {
- type Error = DataFusionError;
+fn serialize_expr(
+ value: Arc<dyn PhysicalExpr>,
+ codec: &dyn PhysicalExtensionCodec,
+) -> Result<protobuf::PhysicalExprNode, DataFusionError> {
+ let expr = value.as_any();
- fn try_from(value: Arc<dyn PhysicalExpr>) -> Result<Self, Self::Error> {
- let expr = value.as_any();
-
- if let Some(expr) = expr.downcast_ref::<Column>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
- protobuf::PhysicalColumn {
- name: expr.name().to_string(),
- index: expr.index() as u32,
- },
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
- let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
- l: Some(Box::new(expr.left().to_owned().try_into()?)),
- r: Some(Box::new(expr.right().to_owned().try_into()?)),
- op: format!("{:?}", expr.op()),
- });
+ if let Some(expr) = expr.downcast_ref::<Column>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
+ protobuf::PhysicalColumn {
+ name: expr.name().to_string(),
+ index: expr.index() as u32,
+ },
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
+ let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
+ l: Some(Box::new(serialize_expr(expr.left().clone(), codec)?)),
+ r: Some(Box::new(serialize_expr(expr.right().clone(), codec)?)),
+ op: format!("{:?}", expr.op()),
+ });
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
- binary_expr,
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(
- protobuf::physical_expr_node::ExprType::Case(
- Box::new(
- protobuf::PhysicalCaseNode {
- expr: expr
- .expr()
- .map(|exp|
exp.clone().try_into().map(Box::new))
- .transpose()?,
- when_then_expr: expr
- .when_then_expr()
- .iter()
- .map(|(when_expr, then_expr)| {
- try_parse_when_then_expr(when_expr,
then_expr)
- })
- .collect::<Result<
- Vec<protobuf::PhysicalWhenThen>,
- Self::Error,
- >>()?,
- else_expr: expr
- .else_expr()
- .map(|a|
a.clone().try_into().map(Box::new))
- .transpose()?,
- },
- ),
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
+ binary_expr,
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(
+ protobuf::physical_expr_node::ExprType::Case(
+ Box::new(
+ protobuf::PhysicalCaseNode {
+ expr: expr
+ .expr()
+ .map(|exp| {
+ serialize_expr(exp.clone(),
codec).map(Box::new)
+ })
+ .transpose()?,
+ when_then_expr: expr
+ .when_then_expr()
+ .iter()
+ .map(|(when_expr, then_expr)| {
+ try_parse_when_then_expr(when_expr,
then_expr, codec)
+ })
+ .collect::<Result<
+ Vec<protobuf::PhysicalWhenThen>,
+ DataFusionError,
+ >>()?,
+ else_expr: expr
+ .else_expr()
+ .map(|a| serialize_expr(a.clone(),
codec).map(Box::new))
+ .transpose()?,
+ },
),
),
- })
- } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::NotExpr(
- Box::new(protobuf::PhysicalNot {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
- Box::new(protobuf::PhysicalIsNull {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
- Box::new(protobuf::PhysicalIsNotNull {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(
- protobuf::physical_expr_node::ExprType::InList(
- Box::new(
- protobuf::PhysicalInListNode {
- expr:
Some(Box::new(expr.expr().to_owned().try_into()?)),
- list: expr
- .list()
- .iter()
- .map(|a| a.clone().try_into())
- .collect::<Result<
+ ),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
+ protobuf::PhysicalNot {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ },
+ ))),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
+ Box::new(protobuf::PhysicalIsNull {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ }),
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
+ Box::new(protobuf::PhysicalIsNotNull {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ }),
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(
+ protobuf::physical_expr_node::ExprType::InList(
+ Box::new(
+ protobuf::PhysicalInListNode {
+ expr: Some(Box::new(serialize_expr(
+ expr.expr().to_owned(),
+ codec,
+ )?)),
+ list: expr
+ .list()
+ .iter()
+ .map(|a| serialize_expr(a.clone(), codec))
+ .collect::<Result<
Vec<protobuf::PhysicalExprNode>,
- Self::Error,
+ DataFusionError,
>>()?,
- negated: expr.negated(),
- },
- ),
+ negated: expr.negated(),
+ },
),
),
- })
- } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::Negative(
- Box::new(protobuf::PhysicalNegativeNode {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(lit) = expr.downcast_ref::<Literal>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::Literal(
- lit.value().try_into()?,
- )),
- })
- } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
+ ),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
+ protobuf::PhysicalNegativeNode {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ },
+ ))),
+ })
+ } else if let Some(lit) = expr.downcast_ref::<Literal>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
+ lit.value().try_into()?,
+ )),
+ })
+ } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
+ protobuf::PhysicalCastNode {
+ expr: Some(Box::new(serialize_expr(cast.expr().to_owned(),
codec)?)),
+ arrow_type: Some(cast.cast_type().try_into()?),
+ },
+ ))),
+ })
+ } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
+ protobuf::PhysicalTryCastNode {
+ expr: Some(Box::new(serialize_expr(cast.expr().to_owned(),
codec)?)),
+ arrow_type: Some(cast.cast_type().try_into()?),
+ },
+ ))),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
+ let args: Vec<protobuf::PhysicalExprNode> = expr
+ .args()
+ .iter()
+ .map(|e| serialize_expr(e.to_owned(), codec))
+ .collect::<Result<Vec<_>, _>>()?;
+ if let Ok(fun) = BuiltinScalarFunction::from_str(expr.name()) {
+ let fun: protobuf::ScalarFunction = (&fun).try_into()?;
+
Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
- protobuf::PhysicalCastNode {
- expr: Some(Box::new(cast.expr().clone().try_into()?)),
- arrow_type: Some(cast.cast_type().try_into()?),
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::ScalarFunction(
+ protobuf::PhysicalScalarFunctionNode {
+ name: expr.name().to_string(),
+ fun: fun.into(),
+ args,
+ return_type: Some(expr.return_type().try_into()?),
},
- ))),
- })
- } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::TryCast(
- Box::new(protobuf::PhysicalTryCastNode {
- expr: Some(Box::new(cast.expr().clone().try_into()?)),
- arrow_type: Some(cast.cast_type().try_into()?),
- }),
)),
})
- } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
- let args: Vec<protobuf::PhysicalExprNode> = expr
- .args()
- .iter()
- .map(|e| e.to_owned().try_into())
- .collect::<Result<Vec<_>, _>>()?;
- if let Ok(fun) = BuiltinScalarFunction::from_str(expr.name()) {
- let fun: protobuf::ScalarFunction = (&fun).try_into()?;
-
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(
- protobuf::physical_expr_node::ExprType::ScalarFunction(
- protobuf::PhysicalScalarFunctionNode {
- name: expr.name().to_string(),
- fun: fun.into(),
- args,
- return_type:
Some(expr.return_type().try_into()?),
- },
- ),
- ),
- })
- } else {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
- protobuf::PhysicalScalarUdfNode {
- name: expr.name().to_string(),
- args,
- return_type: Some(expr.return_type().try_into()?),
- },
- )),
- })
- }
- } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
+ } else {
+ let mut buf = Vec::new();
+ // let _ = codec.try_encode_udf(, &mut buf);
Review Comment:
I'm assuming this is what you mean in your below comment but I think we need
to change
```
pub struct ScalarFunctionExpr {
fun: ScalarFunctionImplementation,
```
to
```
pub struct ScalarFunctionExpr {
fun: ScalarFunctionDefinition,
```
@alamb This would technically be a breaking change since
`ScalarFunctionExpr` has a public method
```
pub fn fun(&self) -> &ScalarFunctionImplementation {
&self.fun
}
```
We could technically do this is in a backwards compatible way I think by
creating a corresponding `ScalarFunctionImplementation` from the
`ScalarFunctionImplementation` or we could just change the public API. Seems to
me like this would probably not be a very disruptive breaking change since
`ScalarFunctionImplementation` is just an opaque function pointer so unlikely
this is being used in application code in a way that wouldn't be easily
adaptable if it started returning `&ScalarFunctionDefinition` instead
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]