yyy1000 commented on code in PR #9436:
URL: https://github.com/apache/arrow-datafusion/pull/9436#discussion_r1510196380
##########
datafusion/proto/src/physical_plan/to_proto.rs:
##########
@@ -374,228 +378,234 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) ->
Result<AggrFn> {
Ok(AggrFn { inner, distinct })
}
-impl TryFrom<Arc<dyn PhysicalExpr>> for protobuf::PhysicalExprNode {
- type Error = DataFusionError;
+fn serialize_expr(
+ value: Arc<dyn PhysicalExpr>,
+ codec: &dyn PhysicalExtensionCodec,
+) -> Result<protobuf::PhysicalExprNode, DataFusionError> {
+ let expr = value.as_any();
- fn try_from(value: Arc<dyn PhysicalExpr>) -> Result<Self, Self::Error> {
- let expr = value.as_any();
-
- if let Some(expr) = expr.downcast_ref::<Column>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
- protobuf::PhysicalColumn {
- name: expr.name().to_string(),
- index: expr.index() as u32,
- },
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
- let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
- l: Some(Box::new(expr.left().to_owned().try_into()?)),
- r: Some(Box::new(expr.right().to_owned().try_into()?)),
- op: format!("{:?}", expr.op()),
- });
+ if let Some(expr) = expr.downcast_ref::<Column>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
+ protobuf::PhysicalColumn {
+ name: expr.name().to_string(),
+ index: expr.index() as u32,
+ },
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
+ let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
+ l: Some(Box::new(serialize_expr(expr.left().clone(), codec)?)),
+ r: Some(Box::new(serialize_expr(expr.right().clone(), codec)?)),
+ op: format!("{:?}", expr.op()),
+ });
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
- binary_expr,
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(
- protobuf::physical_expr_node::ExprType::Case(
- Box::new(
- protobuf::PhysicalCaseNode {
- expr: expr
- .expr()
- .map(|exp|
exp.clone().try_into().map(Box::new))
- .transpose()?,
- when_then_expr: expr
- .when_then_expr()
- .iter()
- .map(|(when_expr, then_expr)| {
- try_parse_when_then_expr(when_expr,
then_expr)
- })
- .collect::<Result<
- Vec<protobuf::PhysicalWhenThen>,
- Self::Error,
- >>()?,
- else_expr: expr
- .else_expr()
- .map(|a|
a.clone().try_into().map(Box::new))
- .transpose()?,
- },
- ),
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
+ binary_expr,
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(
+ protobuf::physical_expr_node::ExprType::Case(
+ Box::new(
+ protobuf::PhysicalCaseNode {
+ expr: expr
+ .expr()
+ .map(|exp| {
+ serialize_expr(exp.clone(),
codec).map(Box::new)
+ })
+ .transpose()?,
+ when_then_expr: expr
+ .when_then_expr()
+ .iter()
+ .map(|(when_expr, then_expr)| {
+ try_parse_when_then_expr(when_expr,
then_expr, codec)
+ })
+ .collect::<Result<
+ Vec<protobuf::PhysicalWhenThen>,
+ DataFusionError,
+ >>()?,
+ else_expr: expr
+ .else_expr()
+ .map(|a| serialize_expr(a.clone(),
codec).map(Box::new))
+ .transpose()?,
+ },
),
),
- })
- } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::NotExpr(
- Box::new(protobuf::PhysicalNot {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
- Box::new(protobuf::PhysicalIsNull {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
- Box::new(protobuf::PhysicalIsNotNull {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(
- protobuf::physical_expr_node::ExprType::InList(
- Box::new(
- protobuf::PhysicalInListNode {
- expr:
Some(Box::new(expr.expr().to_owned().try_into()?)),
- list: expr
- .list()
- .iter()
- .map(|a| a.clone().try_into())
- .collect::<Result<
+ ),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
+ protobuf::PhysicalNot {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ },
+ ))),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
+ Box::new(protobuf::PhysicalIsNull {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ }),
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
+ Box::new(protobuf::PhysicalIsNotNull {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ }),
+ )),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(
+ protobuf::physical_expr_node::ExprType::InList(
+ Box::new(
+ protobuf::PhysicalInListNode {
+ expr: Some(Box::new(serialize_expr(
+ expr.expr().to_owned(),
+ codec,
+ )?)),
+ list: expr
+ .list()
+ .iter()
+ .map(|a| serialize_expr(a.clone(), codec))
+ .collect::<Result<
Vec<protobuf::PhysicalExprNode>,
- Self::Error,
+ DataFusionError,
>>()?,
- negated: expr.negated(),
- },
- ),
+ negated: expr.negated(),
+ },
),
),
- })
- } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::Negative(
- Box::new(protobuf::PhysicalNegativeNode {
- expr:
Some(Box::new(expr.arg().to_owned().try_into()?)),
- }),
- )),
- })
- } else if let Some(lit) = expr.downcast_ref::<Literal>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::Literal(
- lit.value().try_into()?,
- )),
- })
- } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
+ ),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
+ protobuf::PhysicalNegativeNode {
+ expr: Some(Box::new(serialize_expr(expr.arg().to_owned(),
codec)?)),
+ },
+ ))),
+ })
+ } else if let Some(lit) = expr.downcast_ref::<Literal>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
+ lit.value().try_into()?,
+ )),
+ })
+ } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
+ protobuf::PhysicalCastNode {
+ expr: Some(Box::new(serialize_expr(cast.expr().to_owned(),
codec)?)),
+ arrow_type: Some(cast.cast_type().try_into()?),
+ },
+ ))),
+ })
+ } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
+ Ok(protobuf::PhysicalExprNode {
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
+ protobuf::PhysicalTryCastNode {
+ expr: Some(Box::new(serialize_expr(cast.expr().to_owned(),
codec)?)),
+ arrow_type: Some(cast.cast_type().try_into()?),
+ },
+ ))),
+ })
+ } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
+ let args: Vec<protobuf::PhysicalExprNode> = expr
+ .args()
+ .iter()
+ .map(|e| serialize_expr(e.to_owned(), codec))
+ .collect::<Result<Vec<_>, _>>()?;
+ if let Ok(fun) = BuiltinScalarFunction::from_str(expr.name()) {
+ let fun: protobuf::ScalarFunction = (&fun).try_into()?;
+
Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
- protobuf::PhysicalCastNode {
- expr: Some(Box::new(cast.expr().clone().try_into()?)),
- arrow_type: Some(cast.cast_type().try_into()?),
+ expr_type:
Some(protobuf::physical_expr_node::ExprType::ScalarFunction(
+ protobuf::PhysicalScalarFunctionNode {
+ name: expr.name().to_string(),
+ fun: fun.into(),
+ args,
+ return_type: Some(expr.return_type().try_into()?),
},
- ))),
- })
- } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::TryCast(
- Box::new(protobuf::PhysicalTryCastNode {
- expr: Some(Box::new(cast.expr().clone().try_into()?)),
- arrow_type: Some(cast.cast_type().try_into()?),
- }),
)),
})
- } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
- let args: Vec<protobuf::PhysicalExprNode> = expr
- .args()
- .iter()
- .map(|e| e.to_owned().try_into())
- .collect::<Result<Vec<_>, _>>()?;
- if let Ok(fun) = BuiltinScalarFunction::from_str(expr.name()) {
- let fun: protobuf::ScalarFunction = (&fun).try_into()?;
-
- Ok(protobuf::PhysicalExprNode {
- expr_type: Some(
- protobuf::physical_expr_node::ExprType::ScalarFunction(
- protobuf::PhysicalScalarFunctionNode {
- name: expr.name().to_string(),
- fun: fun.into(),
- args,
- return_type:
Some(expr.return_type().try_into()?),
- },
- ),
- ),
- })
- } else {
- Ok(protobuf::PhysicalExprNode {
- expr_type:
Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
- protobuf::PhysicalScalarUdfNode {
- name: expr.name().to_string(),
- args,
- return_type: Some(expr.return_type().try_into()?),
- },
- )),
- })
- }
- } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
+ } else {
+ let mut buf = Vec::new();
+ // let _ = codec.try_encode_udf(, &mut buf);
Review Comment:
I also tried using `create_udf`, but it seems can not work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]