thinkharderdev commented on issue #8706:
URL: 
https://github.com/apache/arrow-datafusion/issues/8706#issuecomment-1964051200

   > @thinkharderdev Much appreciated! I understand the issue now. Another 
question is there some example code for `deserialization` so that I can refer 
to them to think how to write `serialize_expr` function?
   
   Hey @yyy1000, looking at this a bit and I think what we want here is:
   
   1. Have `ScalarUDFExprNode` take an optional opaque payload to potentially 
contain the serialized function:
   ```
   message ScalarUDFExprNode {
     string fun_name = 1;
     repeated LogicalExprNode args = 2;
     optional bytes fun_definition = 3;
   }
   ```
   
   Would do a similar thing for `AggregateUDFExprNode` and `WindowExprNode`
   
   2. Add a new trait to shim in custom function serialization
   ```
   pub trait FunctionExtensionCodec: Debug + Send + Sync {
       fn try_decode_udf(
           &self,
           name: &str,
           buf: &[u8],
       ) -> Result<Arc<ScalarUDF>>;
   
       fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> 
Result<()>;
   
       fn try_decode_udaf(
           &self,
           name: &str,
           buf: &[u8],
       ) -> Result<Arc<AggregateUDF>>;
   
       fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> 
Result<()>;
   
       fn try_decode_udwf(
           &self,
           name: &str,
           buf: &[u8],
       ) -> Result<Arc<WindowUDF>>;
   
       fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> 
Result<()>;
   }
   ```
   
   This trait should have a default implementation which just returns a 
`NotImplmented` error (similar to `DefaultLogicalExtensionCodec`)
   
   3. Modify the `FunctionRegistry` to allow registering a custom function codec
   
   ```
   pub trait FunctionRegistry {
     ... current methods remain unchanged
   
     fn register_extension_codec(&self, codec: Arc<dyn FunctionExtensionCodec>);
   
     fn extension_codec(&self) -> &dyn FunctionExtensionCodec;
   
   }
   ```
   
   4. Replace `impl TryFrom<&Expr> for protobuf::LogicalExprNode` with a free 
function
   ```
   pub fn serialize_expr(expr: &Expr, registry: &dyn FunctionRegistry) -> 
Result<protobuf::LogicalExprNode> {
      ...
   }
   ```
   
   This would be mostly unchanged from the existing `TryFrom` implementation 
except for handling of `Expr::ScalarFunction/AggregateFunction/WindowFunction`. 
We would handle `Expr::ScalarFunction` something like:
   
   ```
               Expr::ScalarFunction(ScalarFunction { func_def, args }) => {
                   let args = args
                       .iter()
                       .map(|expr| expr.try_into())
                       .collect::<Result<Vec<_>, Error>>()?;
                   match func_def {
                       ScalarFunctionDefinition::BuiltIn(fun) => {
                           let fun: protobuf::ScalarFunction = fun.try_into()?;
                           Self {
                               expr_type: Some(ExprType::ScalarFunction(
                                   protobuf::ScalarFunctionNode {
                                       fun: fun.into(),
                                       args,
                                   },
                               )),
                           }
                       }
                       ScalarFunctionDefinition::UDF(fun) => {
                           let mut buf = Vec::new();
                           
registry.extension_codec.try_encode_udf(fun.as_ref(), &mut buf)?;
   
                           let fun_definition = if buf.is_empty() {
                               None
                           } else {
                               Some(buf)
                           };
   
                           Self {
                               expr_type: Some(ExprType::ScalarUdfExpr(
                                   protobuf::ScalarUdfExprNode {
                                       fun_name: fun.name().to_string(),
                                       fun_definition,
                                       args,
                                   },
                               )),
                           }
                       },
                       ScalarFunctionDefinition::Name(_) => {
                           return Err(Error::NotImplemented(
                       "Proto serialization error: Trying to serialize a 
unresolved function"
                           .to_string(),
                   ));
                       }
                   }
               }
   
   ```
   
   4. Similarly, in existing `parse_expr` we try and use the extension codec to 
deserialize the function if `fun_definition` is present:
   ```
   pub fn parse_expr(
       proto: &protobuf::LogicalExprNode,
       registry: &dyn FunctionRegistry,
   ) -> Result<Expr, Error> {
     ... handling of other expr types unchanged
     
     ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode { fun_name, 
fun_definition, args }) => {
     
        let scalar_fn = match fun_definition {
           Some(buf) => registry.extension_codec().try_decode_udf(&fun_name, 
&buf)?,
           None => registry.udf(fun_name.as_str())?,
        };
         Ok(Expr::ScalarFunction(expr::ScalarFunction::new_udf(
               scalar_fn,
               args.iter()
                       .map(|expr| parse_expr(expr, registry))
                       .collect::<Result<Vec<_>, Error>>()?,
               )))
        }
   
   }
   ```
   
   If you don't have bandwidth now to work on this let me know, My team can 
take this up as we are hoping to be able to use this functionality soon. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to