yyy1000 commented on code in PR #9395:
URL: https://github.com/apache/arrow-datafusion/pull/9395#discussion_r1508504101
##########
datafusion/proto/tests/cases/roundtrip_logical_plan.rs:
##########
@@ -631,6 +638,12 @@ pub mod proto {
#[prost(uint64, tag = "1")]
pub k: u64,
}
+
+ #[derive(Clone, PartialEq, ::prost::Message)]
+ pub struct ScalarUDFProto {
+ #[prost(message, tag = "1")]
+ pub expr:
::core::option::Option<datafusion_proto::protobuf::LogicalExprNode>,
+ }
Review Comment:
Here is my test proto
##########
datafusion/proto/tests/cases/roundtrip_logical_plan.rs:
##########
@@ -756,6 +769,82 @@ impl LogicalExtensionCodec for TopKExtensionCodec {
}
}
+#[derive(Debug)]
+pub struct ScalarUDFExtensionCodec {}
+
+impl LogicalExtensionCodec for ScalarUDFExtensionCodec {
+ fn try_decode(
+ &self,
+ _buf: &[u8],
+ _inputs: &[LogicalPlan],
+ _ctx: &SessionContext,
+ ) -> Result<Extension> {
+ not_impl_err!("No extension codec provided")
+ }
+
+ fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
+ not_impl_err!("No extension codec provided")
+ }
+
+ fn try_decode_table_provider(
+ &self,
+ _buf: &[u8],
+ _schema: SchemaRef,
+ _ctx: &SessionContext,
+ ) -> Result<Arc<dyn TableProvider>> {
+ internal_err!("unsupported plan type")
+ }
+
+ fn try_encode_table_provider(
+ &self,
+ _node: Arc<dyn TableProvider>,
+ _buf: &mut Vec<u8>,
+ ) -> Result<()> {
+ internal_err!("unsupported plan type")
+ }
+
+ fn try_decode_udf(
+ &self,
+ _name: &str,
+ buf: &[u8],
+ ctx: &dyn FunctionRegistry,
+ ) -> Result<Arc<ScalarUDF>> {
+ let msg = ScalarUDFProto::decode(buf).map_err(|_| {
+ DataFusionError::Internal("Error decoding test table".to_string())
+ })?;
+ if let Some(expr) = msg.expr.as_ref() {
+ let node = from_proto::parse_expr(expr, ctx, self)?;
Review Comment:
Here if I want to convert a `LogicalExprNode` to a `Expr`, I have to call
`parse_expr`.
However, `parse_expr` will call `codec. try_decode_udf`, which lead circle
call.
That makes me a little confused how to implement this method.
##########
datafusion/proto/tests/cases/roundtrip_logical_plan.rs:
##########
@@ -756,6 +769,82 @@ impl LogicalExtensionCodec for TopKExtensionCodec {
}
}
+#[derive(Debug)]
+pub struct ScalarUDFExtensionCodec {}
+
+impl LogicalExtensionCodec for ScalarUDFExtensionCodec {
+ fn try_decode(
+ &self,
+ _buf: &[u8],
+ _inputs: &[LogicalPlan],
+ _ctx: &SessionContext,
+ ) -> Result<Extension> {
+ not_impl_err!("No extension codec provided")
+ }
+
+ fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<()> {
+ not_impl_err!("No extension codec provided")
+ }
+
+ fn try_decode_table_provider(
+ &self,
+ _buf: &[u8],
+ _schema: SchemaRef,
+ _ctx: &SessionContext,
+ ) -> Result<Arc<dyn TableProvider>> {
+ internal_err!("unsupported plan type")
+ }
+
+ fn try_encode_table_provider(
+ &self,
+ _node: Arc<dyn TableProvider>,
+ _buf: &mut Vec<u8>,
+ ) -> Result<()> {
+ internal_err!("unsupported plan type")
+ }
+
+ fn try_decode_udf(
+ &self,
+ _name: &str,
+ buf: &[u8],
+ ctx: &dyn FunctionRegistry,
+ ) -> Result<Arc<ScalarUDF>> {
+ let msg = ScalarUDFProto::decode(buf).map_err(|_| {
+ DataFusionError::Internal("Error decoding test table".to_string())
+ })?;
+ if let Some(expr) = msg.expr.as_ref() {
+ let node = from_proto::parse_expr(expr, ctx, self)?;
+ match node {
+ Expr::ScalarFunction(ScalarFunction { func_def, args: _ }) => {
+ match func_def {
+ ScalarFunctionDefinition::UDF(fun) => Ok(fun),
+ _ => internal_err!("invalid plan, no udf"),
+ }
+ }
+ _ => internal_err!("invalid plan, no udf"),
+ }
+ } else {
+ internal_err!("invalid plan, no expr")
+ }
+ }
+
+ fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) ->
Result<()> {
+ let func = Expr::ScalarFunction(ScalarFunction {
+ func_def: ScalarFunctionDefinition::UDF(Arc::new(node.clone())),
+ args: vec![],
+ });
+
+ let proto = proto::ScalarUDFProto {
+ expr: Some(serialize_expr(&func, self)?),
Review Comment:
Also here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]