Jefffrey commented on code in PR #18167:
URL: https://github.com/apache/datafusion/pull/18167#discussion_r2444202553


##########
datafusion/proto/src/bytes/mod.rs:
##########
@@ -104,95 +96,106 @@ impl Serializeable for Expr {
         // Until the underlying prost issue ( 
https://github.com/tokio-rs/prost/issues/736 ) is fixed, we try to
         // deserialize the data here and check for errors.
         //
-        // Need to provide some placeholder registry because the stream may 
contain UDFs
-        struct PlaceHolderRegistry;
-
-        impl FunctionRegistry for PlaceHolderRegistry {
-            fn udfs(&self) -> std::collections::HashSet<String> {
-                std::collections::HashSet::default()
-            }
-
-            fn udf(&self, name: &str) -> 
Result<Arc<datafusion_expr::ScalarUDF>> {
+        // Need to provide some placeholder codec because the stream may 
contain UDFs
+        // (using codec since with TaskContext we can't pass through unknown 
udfs; it
+        // requires registering them beforehand)
+        #[derive(Debug)]
+        struct PlaceholderLogicalExtensionCodec {}
+        impl LogicalExtensionCodec for PlaceholderLogicalExtensionCodec {
+            fn try_decode_udf(&self, name: &str, _buf: &[u8]) -> 
Result<Arc<ScalarUDF>> {
                 Ok(Arc::new(create_udf(
                     name,
                     vec![],
-                    arrow::datatypes::DataType::Null,
+                    DataType::Null,
                     Volatility::Immutable,
                     Arc::new(|_| unimplemented!()),
                 )))
             }
 
-            fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
+            fn try_decode_udaf(
+                &self,
+                name: &str,
+                _buf: &[u8],
+            ) -> Result<Arc<AggregateUDF>> {
                 Ok(Arc::new(create_udaf(
                     name,
-                    vec![arrow::datatypes::DataType::Null],
-                    Arc::new(arrow::datatypes::DataType::Null),
+                    vec![DataType::Null],
+                    Arc::new(DataType::Null),
                     Volatility::Immutable,
                     Arc::new(|_| unimplemented!()),
                     Arc::new(vec![]),
                 )))
             }
 
-            fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
+            fn try_decode_udwf(&self, name: &str, _buf: &[u8]) -> 
Result<Arc<WindowUDF>> {
                 Ok(Arc::new(create_udwf(
                     name,
-                    arrow::datatypes::DataType::Null,
-                    Arc::new(arrow::datatypes::DataType::Null),
+                    DataType::Null,
+                    Arc::new(DataType::Null),
                     Volatility::Immutable,
                     Arc::new(|| unimplemented!()),
                 )))
             }
-            fn register_udaf(
-                &mut self,
-                _udaf: Arc<AggregateUDF>,
-            ) -> Result<Option<Arc<AggregateUDF>>> {
-                datafusion_common::internal_err!(
-                    "register_udaf called in Placeholder Registry!"
-                )
-            }
-            fn register_udf(
-                &mut self,
-                _udf: Arc<datafusion_expr::ScalarUDF>,
-            ) -> Result<Option<Arc<datafusion_expr::ScalarUDF>>> {
-                datafusion_common::internal_err!(
-                    "register_udf called in Placeholder Registry!"
-                )
-            }
-            fn register_udwf(
-                &mut self,
-                _udaf: Arc<WindowUDF>,
-            ) -> Result<Option<Arc<WindowUDF>>> {
-                datafusion_common::internal_err!(
-                    "register_udwf called in Placeholder Registry!"
-                )
+
+            fn try_decode(
+                &self,
+                _buf: &[u8],
+                _inputs: &[LogicalPlan],
+                _ctx: &TaskContext,
+            ) -> Result<Extension> {
+                unimplemented!()
             }
 
-            fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
-                vec![]
+            fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> 
Result<()> {
+                unimplemented!()
             }
 
-            fn udafs(&self) -> std::collections::HashSet<String> {
-                std::collections::HashSet::default()
+            fn try_decode_table_provider(
+                &self,
+                _buf: &[u8],
+                _table_ref: &TableReference,
+                _schema: SchemaRef,
+                _ctx: &TaskContext,
+            ) -> Result<Arc<dyn TableProvider>> {
+                unimplemented!()
             }
 
-            fn udwfs(&self) -> std::collections::HashSet<String> {
-                std::collections::HashSet::default()
+            fn try_encode_table_provider(
+                &self,
+                _table_ref: &TableReference,
+                _node: Arc<dyn TableProvider>,
+                _buf: &mut Vec<u8>,
+            ) -> Result<()> {
+                unimplemented!()
             }
         }
-        Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?;
+
+        // Copied from from_bytes_with_ctx below but with placeholder registry 
instead of
+        // default.
+        {
+            let bytes: &[u8] = &bytes;
+            let protobuf = 
protobuf::LogicalExprNode::decode(bytes).map_err(|e| {
+                plan_datafusion_err!("Error decoding expr as protobuf: {e}")
+            })?;
+
+            let extension_codec = PlaceholderLogicalExtensionCodec {};
+            logical_plan::from_proto::parse_expr(
+                &protobuf,
+                &TaskContext::default(),
+                &extension_codec,
+            )
+            .map_err(|e| plan_datafusion_err!("Error parsing protobuf into 
Expr: {e}"))?;
+        }

Review Comment:
   This is a big ugly, but it was because since we're now requiring 
`TaskContext` instead of `dyn FunctionRegistry`, there was no easy way within 
`TaskContext` to just pass through arbitrary UDFs/UDAFs/UDWFs like what 
`PlaceHolderRegistry` was meant to do. Instead, we achieve the same behaviour 
via the codec. So I'm copying the `from_bytes_with_ctx` code into here to 
customize the codec we pass in.



##########
datafusion/proto/src/logical_plan/from_proto.rs:
##########
@@ -635,13 +635,33 @@ pub fn parse_expr(
                 Some(data_type.try_into()?),
             ))),
         },
+        ExprType::ScalarSubquery(scalar_subquery) => {
+            let subquery = scalar_subquery
+                .subquery
+                .as_ref()
+                .ok_or_else(|| Error::required("subquery"))?;
+            Ok(Expr::ScalarSubquery(Subquery {
+                subquery: Arc::new(subquery.try_into_logical_plan(ctx, 
codec)?),
+                outer_ref_columns: parse_exprs(
+                    &scalar_subquery.outer_ref_columns,
+                    ctx,
+                    codec,
+                )?,
+                spans: Spans::new(),
+            }))
+        }
+        ExprType::OuterReferenceColumn(OuterReferenceColumn { field, column }) 
=> {
+            let column = column.to_owned().ok_or_else(|| 
Error::required("column"))?;
+            let field = field.as_ref().required("field")?;
+            Ok(Expr::OuterReferenceColumn(Arc::new(field), column.into()))
+        }

Review Comment:
   Actual implementation for deserializing



##########
datafusion/proto/src/bytes/mod.rs:
##########
@@ -66,24 +63,19 @@ pub trait Serializeable: Sized {
 
     /// Convert `bytes` (the output of [`to_bytes`]) back into an
     /// object. This will error if the serialized bytes contain any
-    /// user defined functions, in which case use
-    /// [`from_bytes_with_registry`]
+    /// user defined functions, in which case use [`from_bytes_with_ctx`].
     ///
     /// [`to_bytes`]: Self::to_bytes
     /// [`from_bytes_with_registry`]: Self::from_bytes_with_registry
     fn from_bytes(bytes: &[u8]) -> Result<Self> {
-        Self::from_bytes_with_registry(bytes, &registry::NoRegistry {})
+        Self::from_bytes_with_ctx(bytes, &TaskContext::default())
     }
 
-    /// Convert `bytes` (the output of [`to_bytes`]) back into an
-    /// object resolving user defined functions with the specified
-    /// `registry`
+    /// Convert `bytes` (the output of [`to_bytes`]) back into an object
+    /// resolving user defined functions with the specified `ctx`.
     ///
     /// [`to_bytes`]: Self::to_bytes
-    fn from_bytes_with_registry(
-        bytes: &[u8],
-        registry: &dyn FunctionRegistry,
-    ) -> Result<Self>;
+    fn from_bytes_with_ctx(bytes: &[u8], ctx: &TaskContext) -> Result<Self>;

Review Comment:
   Breaking change to API here since we need a `TaskContext` because of that 
`parse_expr` change; I figured it'd make sense to also rename the method to be 
more accurate.



##########
datafusion/proto/src/logical_plan/to_proto.rs:
##########
@@ -576,13 +577,30 @@ pub fn serialize_expr(
                 qualifier: qualifier.to_owned().map(|x| x.into()),
             })),
         },
-        Expr::ScalarSubquery(_)
-        | Expr::InSubquery(_)
-        | Expr::Exists { .. }
-        | Expr::OuterReferenceColumn { .. } => {
-            // we would need to add logical plan operators to datafusion.proto 
to support this
-            // see discussion in 
https://github.com/apache/datafusion/issues/2565
-            return Err(Error::General("Proto serialization error: 
Expr::ScalarSubquery(_) | Expr::InSubquery(_) | Expr::Exists { .. } | 
Exp:OuterReferenceColumn not supported".to_string()));
+        Expr::ScalarSubquery(Subquery {
+            subquery,
+            outer_ref_columns,
+            spans: _,
+        }) => protobuf::LogicalExprNode {
+            expr_type: Some(ExprType::ScalarSubquery(Box::new(
+                protobuf::ScalarSubquery {
+                    subquery: Some(Box::new(
+                        
protobuf::LogicalPlanNode::try_from_logical_plan(subquery, codec)
+                            .map_err(|e| Error::General(format!("Proto 
serialization error: Failed to serialize Scalar Subquery: {e}")))?,
+                    )),
+                    outer_ref_columns: serialize_exprs(outer_ref_columns, 
codec)?,
+                },
+            ))),
+        },
+        Expr::OuterReferenceColumn(field, column) => {
+            protobuf::LogicalExprNode {
+                expr_type: 
Some(ExprType::OuterReferenceColumn(OuterReferenceColumn{
+                    field: Some(field.as_ref().try_into()?), column: 
Some(column.into())
+                }))
+            }
+        }
+        Expr::InSubquery(_) | Expr::Exists { .. } => {
+            return Err(Error::NotImplemented("Proto serialization error: 
Expr::InSubquery(_) | Expr::Exists { .. } not supported".to_string()));

Review Comment:
   Actual implementation for serializing



##########
datafusion/proto/src/bytes/mod.rs:
##########
@@ -66,24 +63,19 @@ pub trait Serializeable: Sized {
 
     /// Convert `bytes` (the output of [`to_bytes`]) back into an
     /// object. This will error if the serialized bytes contain any
-    /// user defined functions, in which case use
-    /// [`from_bytes_with_registry`]
+    /// user defined functions, in which case use [`from_bytes_with_ctx`].
     ///
     /// [`to_bytes`]: Self::to_bytes
     /// [`from_bytes_with_registry`]: Self::from_bytes_with_registry
     fn from_bytes(bytes: &[u8]) -> Result<Self> {
-        Self::from_bytes_with_registry(bytes, &registry::NoRegistry {})
+        Self::from_bytes_with_ctx(bytes, &TaskContext::default())

Review Comment:
   Default `TaskContext` will have an empty function registry just like 
`NoRegistry` so this is equivalent; I also deleted `NoRegistry` entirely as it 
wasn't used anywhere else after it was removed here.



##########
datafusion/proto/src/logical_plan/mod.rs:
##########
@@ -1206,10 +1206,10 @@ impl AsLogicalPlan for LogicalPlanNode {
                     logical_plan_type: 
Some(LogicalPlanType::Selection(Box::new(
                         protobuf::SelectionNode {
                             input: Some(Box::new(input)),
-                            expr: Some(serialize_expr(
+                            expr: Some(Box::new(serialize_expr(

Review Comment:
   My changes to the `datafusion.proto` seems to also affect some other nodes 
during the generation of the proto code, probably because of the enum becoming 
too big so it now boxes seemingly unrelated nodes.



##########
datafusion/proto/src/logical_plan/from_proto.rs:
##########
@@ -254,7 +257,7 @@ impl From<protobuf::NullTreatment> for NullTreatment {
 
 pub fn parse_expr(
     proto: &protobuf::LogicalExprNode,
-    registry: &dyn FunctionRegistry,
+    ctx: &TaskContext,
     codec: &dyn LogicalExtensionCodec,
 ) -> Result<Expr, Error> {

Review Comment:
   Since `Expr::ScalarSubquery` essentially contains a `LogicalPlan`, in order 
to parse this logical plan we need to use 
`LogicalPlanNode::try_into_logical_plan` which needs a `TaskContext`. So had to 
expand this API to required a `TaskContext` instead of simply `dyn 
FunctionRegistry` (which `TaskContext` implements). This propagates in many 
other places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to