yyy1000 commented on code in PR #9436:
URL: https://github.com/apache/arrow-datafusion/pull/9436#discussion_r1510196380


##########
datafusion/proto/src/physical_plan/to_proto.rs:
##########
@@ -374,228 +378,234 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) -> 
Result<AggrFn> {
     Ok(AggrFn { inner, distinct })
 }
 
-impl TryFrom<Arc<dyn PhysicalExpr>> for protobuf::PhysicalExprNode {
-    type Error = DataFusionError;
+fn serialize_expr(
+    value: Arc<dyn PhysicalExpr>,
+    codec: &dyn PhysicalExtensionCodec,
+) -> Result<protobuf::PhysicalExprNode, DataFusionError> {
+    let expr = value.as_any();
 
-    fn try_from(value: Arc<dyn PhysicalExpr>) -> Result<Self, Self::Error> {
-        let expr = value.as_any();
-
-        if let Some(expr) = expr.downcast_ref::<Column>() {
-            Ok(protobuf::PhysicalExprNode {
-                expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
-                    protobuf::PhysicalColumn {
-                        name: expr.name().to_string(),
-                        index: expr.index() as u32,
-                    },
-                )),
-            })
-        } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
-            let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
-                l: Some(Box::new(expr.left().to_owned().try_into()?)),
-                r: Some(Box::new(expr.right().to_owned().try_into()?)),
-                op: format!("{:?}", expr.op()),
-            });
+    if let Some(expr) = expr.downcast_ref::<Column>() {
+        Ok(protobuf::PhysicalExprNode {
+            expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
+                protobuf::PhysicalColumn {
+                    name: expr.name().to_string(),
+                    index: expr.index() as u32,
+                },
+            )),
+        })
+    } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
+        let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
+            l: Some(Box::new(serialize_expr(expr.left().clone(), codec)?)),
+            r: Some(Box::new(serialize_expr(expr.right().clone(), codec)?)),
+            op: format!("{:?}", expr.op()),
+        });
 
-            Ok(protobuf::PhysicalExprNode {
-                expr_type: 
Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
-                    binary_expr,
-                )),
-            })
-        } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
-            Ok(protobuf::PhysicalExprNode {
-                expr_type: Some(
-                    protobuf::physical_expr_node::ExprType::Case(
-                        Box::new(
-                            protobuf::PhysicalCaseNode {
-                                expr: expr
-                                    .expr()
-                                    .map(|exp| 
exp.clone().try_into().map(Box::new))
-                                    .transpose()?,
-                                when_then_expr: expr
-                                    .when_then_expr()
-                                    .iter()
-                                    .map(|(when_expr, then_expr)| {
-                                        try_parse_when_then_expr(when_expr, 
then_expr)
-                                    })
-                                    .collect::<Result<
-                                        Vec<protobuf::PhysicalWhenThen>,
-                                        Self::Error,
-                                    >>()?,
-                                else_expr: expr
-                                    .else_expr()
-                                    .map(|a| 
a.clone().try_into().map(Box::new))
-                                    .transpose()?,
-                            },
-                        ),
+        Ok(protobuf::PhysicalExprNode {
+            expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
+                binary_expr,
+            )),
+        })
+    } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
+        Ok(protobuf::PhysicalExprNode {
+            expr_type: Some(
+                protobuf::physical_expr_node::ExprType::Case(
+                    Box::new(
+                        protobuf::PhysicalCaseNode {
+                            expr: expr
+                                .expr()
+                                .map(|exp| {
+                                    serialize_expr(exp.clone(), 
codec).map(Box::new)
+                                })
+                                .transpose()?,
+                            when_then_expr: expr
+                                .when_then_expr()
+                                .iter()
+                                .map(|(when_expr, then_expr)| {
+                                    try_parse_when_then_expr(when_expr, 
then_expr, codec)
+                                })
+                                .collect::<Result<
+                                    Vec<protobuf::PhysicalWhenThen>,
+                                    DataFusionError,
+                                >>()?,
+                            else_expr: expr
+                                .else_expr()
+                                .map(|a| serialize_expr(a.clone(), 
codec).map(Box::new))
+                                .transpose()?,
+                        },
                     ),
                 ),
-            })
-        } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
-            Ok(protobuf::PhysicalExprNode {
-                expr_type: 
Some(protobuf::physical_expr_node::ExprType::NotExpr(
-                    Box::new(protobuf::PhysicalNot {
-                        expr: 
Some(Box::new(expr.arg().to_owned().try_into()?)),
-                    }),
-                )),
-            })
-        } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
-            Ok(protobuf::PhysicalExprNode {
-                expr_type: 
Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
-                    Box::new(protobuf::PhysicalIsNull {
-                        expr: 
Some(Box::new(expr.arg().to_owned().try_into()?)),
-                    }),
-                )),
-            })
-        } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
-            Ok(protobuf::PhysicalExprNode {
-                expr_type: 
Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
-                    Box::new(protobuf::PhysicalIsNotNull {
-                        expr: 
Some(Box::new(expr.arg().to_owned().try_into()?)),
-                    }),
-                )),
-            })
-        } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
-            Ok(protobuf::PhysicalExprNode {
-                expr_type: Some(
-                    protobuf::physical_expr_node::ExprType::InList(
-                        Box::new(
-                            protobuf::PhysicalInListNode {
-                                expr: 
Some(Box::new(expr.expr().to_owned().try_into()?)),
-                                list: expr
-                                    .list()
-                                    .iter()
-                                    .map(|a| a.clone().try_into())
-                                    .collect::<Result<
+            ),
+        })
+    } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
+        Ok(protobuf::PhysicalExprNode {
+            expr_type: 
Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
+                protobuf::PhysicalNot {
+                    expr: Some(Box::new(serialize_expr(expr.arg().to_owned(), 
codec)?)),
+                },
+            ))),
+        })
+    } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
+        Ok(protobuf::PhysicalExprNode {
+            expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
+                Box::new(protobuf::PhysicalIsNull {
+                    expr: Some(Box::new(serialize_expr(expr.arg().to_owned(), 
codec)?)),
+                }),
+            )),
+        })
+    } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
+        Ok(protobuf::PhysicalExprNode {
+            expr_type: 
Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
+                Box::new(protobuf::PhysicalIsNotNull {
+                    expr: Some(Box::new(serialize_expr(expr.arg().to_owned(), 
codec)?)),
+                }),
+            )),
+        })
+    } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
+        Ok(protobuf::PhysicalExprNode {
+            expr_type: Some(
+                protobuf::physical_expr_node::ExprType::InList(
+                    Box::new(
+                        protobuf::PhysicalInListNode {
+                            expr: Some(Box::new(serialize_expr(
+                                expr.expr().to_owned(),
+                                codec,
+                            )?)),
+                            list: expr
+                                .list()
+                                .iter()
+                                .map(|a| serialize_expr(a.clone(), codec))
+                                .collect::<Result<
                                     Vec<protobuf::PhysicalExprNode>,
-                                    Self::Error,
+                                    DataFusionError,
                                 >>()?,
-                                negated: expr.negated(),
-                            },
-                        ),
+                            negated: expr.negated(),
+                        },
                     ),
                 ),
-            })
-        } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
-            Ok(protobuf::PhysicalExprNode {
-                expr_type: 
Some(protobuf::physical_expr_node::ExprType::Negative(
-                    Box::new(protobuf::PhysicalNegativeNode {
-                        expr: 
Some(Box::new(expr.arg().to_owned().try_into()?)),
-                    }),
-                )),
-            })
-        } else if let Some(lit) = expr.downcast_ref::<Literal>() {
-            Ok(protobuf::PhysicalExprNode {
-                expr_type: 
Some(protobuf::physical_expr_node::ExprType::Literal(
-                    lit.value().try_into()?,
-                )),
-            })
-        } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
+            ),
+        })
+    } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
+        Ok(protobuf::PhysicalExprNode {
+            expr_type: 
Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
+                protobuf::PhysicalNegativeNode {
+                    expr: Some(Box::new(serialize_expr(expr.arg().to_owned(), 
codec)?)),
+                },
+            ))),
+        })
+    } else if let Some(lit) = expr.downcast_ref::<Literal>() {
+        Ok(protobuf::PhysicalExprNode {
+            expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
+                lit.value().try_into()?,
+            )),
+        })
+    } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
+        Ok(protobuf::PhysicalExprNode {
+            expr_type: 
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
+                protobuf::PhysicalCastNode {
+                    expr: Some(Box::new(serialize_expr(cast.expr().to_owned(), 
codec)?)),
+                    arrow_type: Some(cast.cast_type().try_into()?),
+                },
+            ))),
+        })
+    } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
+        Ok(protobuf::PhysicalExprNode {
+            expr_type: 
Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
+                protobuf::PhysicalTryCastNode {
+                    expr: Some(Box::new(serialize_expr(cast.expr().to_owned(), 
codec)?)),
+                    arrow_type: Some(cast.cast_type().try_into()?),
+                },
+            ))),
+        })
+    } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
+        let args: Vec<protobuf::PhysicalExprNode> = expr
+            .args()
+            .iter()
+            .map(|e| serialize_expr(e.to_owned(), codec))
+            .collect::<Result<Vec<_>, _>>()?;
+        if let Ok(fun) = BuiltinScalarFunction::from_str(expr.name()) {
+            let fun: protobuf::ScalarFunction = (&fun).try_into()?;
+
             Ok(protobuf::PhysicalExprNode {
-                expr_type: 
Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
-                    protobuf::PhysicalCastNode {
-                        expr: Some(Box::new(cast.expr().clone().try_into()?)),
-                        arrow_type: Some(cast.cast_type().try_into()?),
+                expr_type: 
Some(protobuf::physical_expr_node::ExprType::ScalarFunction(
+                    protobuf::PhysicalScalarFunctionNode {
+                        name: expr.name().to_string(),
+                        fun: fun.into(),
+                        args,
+                        return_type: Some(expr.return_type().try_into()?),
                     },
-                ))),
-            })
-        } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
-            Ok(protobuf::PhysicalExprNode {
-                expr_type: 
Some(protobuf::physical_expr_node::ExprType::TryCast(
-                    Box::new(protobuf::PhysicalTryCastNode {
-                        expr: Some(Box::new(cast.expr().clone().try_into()?)),
-                        arrow_type: Some(cast.cast_type().try_into()?),
-                    }),
                 )),
             })
-        } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
-            let args: Vec<protobuf::PhysicalExprNode> = expr
-                .args()
-                .iter()
-                .map(|e| e.to_owned().try_into())
-                .collect::<Result<Vec<_>, _>>()?;
-            if let Ok(fun) = BuiltinScalarFunction::from_str(expr.name()) {
-                let fun: protobuf::ScalarFunction = (&fun).try_into()?;
-
-                Ok(protobuf::PhysicalExprNode {
-                    expr_type: Some(
-                        protobuf::physical_expr_node::ExprType::ScalarFunction(
-                            protobuf::PhysicalScalarFunctionNode {
-                                name: expr.name().to_string(),
-                                fun: fun.into(),
-                                args,
-                                return_type: 
Some(expr.return_type().try_into()?),
-                            },
-                        ),
-                    ),
-                })
-            } else {
-                Ok(protobuf::PhysicalExprNode {
-                    expr_type: 
Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
-                        protobuf::PhysicalScalarUdfNode {
-                            name: expr.name().to_string(),
-                            args,
-                            return_type: Some(expr.return_type().try_into()?),
-                        },
-                    )),
-                })
-            }
-        } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
+        } else {
+            let mut buf = Vec::new();
+            // let _ = codec.try_encode_udf(, &mut buf);

Review Comment:
   I also tried using `create_udf`, but it seems can not work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to