yyy1000 commented on code in PR #9436:
URL: https://github.com/apache/arrow-datafusion/pull/9436#discussion_r1523917015
##########
datafusion/physical-expr/src/scalar_function.rs:
##########
@@ -171,8 +171,17 @@ impl PhysicalExpr for ScalarFunctionExpr {
.collect::<Result<Vec<_>>>()?,
};
+ let fun_implementation = match self.fun {
+ ScalarFunctionDefinition::BuiltIn(ref fun) =>
create_physical_fun(fun)?,
Review Comment:
Here it will call `create_physical_fun` to get the `FunctionImplementation`
##########
datafusion/proto/src/physical_plan/from_proto.rs:
##########
@@ -110,7 +114,9 @@ pub fn parse_physical_sort_exprs(
.iter()
.map(|sort_expr| {
if let Some(expr) = &sort_expr.expr {
- let expr = parse_physical_expr(expr.as_ref(), registry,
input_schema)?;
+ let codec = DefaultPhysicalExtensionCodec {};
+ let expr =
+ parse_physical_expr(expr.as_ref(), registry, input_schema,
&codec)?;
Review Comment:
I create `DefaultPhysicalExtensionCodec` for cases that will not need
`ExtensionCodec`
##########
datafusion/proto/src/physical_plan/to_proto.rs:
##########
@@ -374,195 +381,215 @@ fn aggr_expr_to_aggr_fn(expr: &dyn AggregateExpr) ->
Result<AggrFn> {
Ok(AggrFn { inner, distinct })
}
-impl TryFrom<Arc<dyn PhysicalExpr>> for protobuf::PhysicalExprNode {
- type Error = DataFusionError;
-
- fn try_from(value: Arc<dyn PhysicalExpr>) -> Result<Self, Self::Error> {
- let expr = value.as_any();
+pub fn serialize_expr(
+ value: Arc<dyn PhysicalExpr>,
+ codec: &dyn PhysicalExtensionCodec,
+) -> Result<protobuf::PhysicalExprNode, DataFusionError> {
+ let expr = value.as_any();
Review Comment:
Applied the method from LogicalExpr
##########
datafusion/physical-expr/src/functions.rs:
##########
@@ -69,14 +70,12 @@ pub fn create_physical_expr(
let data_type = fun.return_type(&input_expr_types)?;
- let fun_expr: ScalarFunctionImplementation =
- create_physical_fun(fun, execution_props)?;
-
let monotonicity = fun.monotonicity();
+ let fun_def = ScalarFunctionDefinition::BuiltIn(*fun);
Review Comment:
Here I just pass the `fun_def`, will call `create_physical_fun` when
`execute` the `PhysicalExpr`
##########
datafusion/proto/src/physical_plan/from_proto.rs:
##########
@@ -348,19 +356,22 @@ pub fn parse_physical_expr(
)?
}
ExprType::ScalarUdf(e) => {
- let udf = registry.udf(e.name.as_str())?;
+ let udf = match &e.fun_definition {
+ Some(buf) => codec.try_decode_udf(&e.name, buf)?,
+ None => registry.udf(e.name.as_str())?,
+ };
Review Comment:
Here is the `decode` logic
##########
datafusion/physical-expr/src/functions.rs:
##########
@@ -57,7 +58,7 @@ pub fn create_physical_expr(
fun: &BuiltinScalarFunction,
input_phy_exprs: &[Arc<dyn PhysicalExpr>],
input_schema: &Schema,
- execution_props: &ExecutionProps,
+ _execution_props: &ExecutionProps,
Review Comment:
Here the `ExecutionProps` param can be moved, but I'd like to do it in a
follow-up PR because doing all in a PR may make me less easy to inspect error.
##########
datafusion/proto/tests/cases/roundtrip_physical_plan.rs:
##########
@@ -665,6 +670,133 @@ fn roundtrip_scalar_udf() -> Result<()> {
roundtrip_test_with_context(Arc::new(project), ctx)
}
+#[test]
Review Comment:
Here is the test case, mainly copied from `LogicalExpr` Serde test
##########
datafusion/core/src/physical_optimizer/projection_pushdown.rs:
##########
@@ -1345,7 +1385,9 @@ mod tests {
Arc::new(NegativeExpr::new(Arc::new(Column::new("f", 4)))),
Arc::new(ScalarFunctionExpr::new(
"scalar_expr",
- Arc::new(|_: &[ColumnarValue]| unimplemented!("not
implemented")),
+
ScalarFunctionDefinition::UDF(Arc::new(ScalarUDF::new_from_impl(
+ DummyUDF::new(),
+ ))),
Review Comment:
Since here `ScalarFunctionExpr` needs a `ScalarFunctionDefinition`, I
created one for each. Don't know whether there's a better way to do it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]