Jefffrey commented on code in PR #18167:
URL: https://github.com/apache/datafusion/pull/18167#discussion_r2444202553
##########
datafusion/proto/src/bytes/mod.rs:
##########
@@ -104,95 +96,106 @@ impl Serializeable for Expr {
// Until the underlying prost issue (
https://github.com/tokio-rs/prost/issues/736 ) is fixed, we try to
// deserialize the data here and check for errors.
//
- // Need to provide some placeholder registry because the stream may
contain UDFs
- struct PlaceHolderRegistry;
-
- impl FunctionRegistry for PlaceHolderRegistry {
- fn udfs(&self) -> std::collections::HashSet<String> {
- std::collections::HashSet::default()
- }
-
- fn udf(&self, name: &str) ->
Result<Arc<datafusion_expr::ScalarUDF>> {
+ // Need to provide some placeholder codec because the stream may
contain UDFs
+ // (using codec since with TaskContext we can't pass through unknown
udfs; it
+ // requires registering them beforehand)
+ #[derive(Debug)]
+ struct PlaceholderLogicalExtensionCodec {}
+ impl LogicalExtensionCodec for PlaceholderLogicalExtensionCodec {
+ fn try_decode_udf(&self, name: &str, _buf: &[u8]) ->
Result<Arc<ScalarUDF>> {
Ok(Arc::new(create_udf(
name,
vec![],
- arrow::datatypes::DataType::Null,
+ DataType::Null,
Volatility::Immutable,
Arc::new(|_| unimplemented!()),
)))
}
- fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
+ fn try_decode_udaf(
+ &self,
+ name: &str,
+ _buf: &[u8],
+ ) -> Result<Arc<AggregateUDF>> {
Ok(Arc::new(create_udaf(
name,
- vec![arrow::datatypes::DataType::Null],
- Arc::new(arrow::datatypes::DataType::Null),
+ vec![DataType::Null],
+ Arc::new(DataType::Null),
Volatility::Immutable,
Arc::new(|_| unimplemented!()),
Arc::new(vec![]),
)))
}
- fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
+ fn try_decode_udwf(&self, name: &str, _buf: &[u8]) ->
Result<Arc<WindowUDF>> {
Ok(Arc::new(create_udwf(
name,
- arrow::datatypes::DataType::Null,
- Arc::new(arrow::datatypes::DataType::Null),
+ DataType::Null,
+ Arc::new(DataType::Null),
Volatility::Immutable,
Arc::new(|| unimplemented!()),
)))
}
- fn register_udaf(
- &mut self,
- _udaf: Arc<AggregateUDF>,
- ) -> Result<Option<Arc<AggregateUDF>>> {
- datafusion_common::internal_err!(
- "register_udaf called in Placeholder Registry!"
- )
- }
- fn register_udf(
- &mut self,
- _udf: Arc<datafusion_expr::ScalarUDF>,
- ) -> Result<Option<Arc<datafusion_expr::ScalarUDF>>> {
- datafusion_common::internal_err!(
- "register_udf called in Placeholder Registry!"
- )
- }
- fn register_udwf(
- &mut self,
- _udaf: Arc<WindowUDF>,
- ) -> Result<Option<Arc<WindowUDF>>> {
- datafusion_common::internal_err!(
- "register_udwf called in Placeholder Registry!"
- )
+
+ fn try_decode(
+ &self,
+ _buf: &[u8],
+ _inputs: &[LogicalPlan],
+ _ctx: &TaskContext,
+ ) -> Result<Extension> {
+ unimplemented!()
}
- fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
- vec![]
+ fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) ->
Result<()> {
+ unimplemented!()
}
- fn udafs(&self) -> std::collections::HashSet<String> {
- std::collections::HashSet::default()
+ fn try_decode_table_provider(
+ &self,
+ _buf: &[u8],
+ _table_ref: &TableReference,
+ _schema: SchemaRef,
+ _ctx: &TaskContext,
+ ) -> Result<Arc<dyn TableProvider>> {
+ unimplemented!()
}
- fn udwfs(&self) -> std::collections::HashSet<String> {
- std::collections::HashSet::default()
+ fn try_encode_table_provider(
+ &self,
+ _table_ref: &TableReference,
+ _node: Arc<dyn TableProvider>,
+ _buf: &mut Vec<u8>,
+ ) -> Result<()> {
+ unimplemented!()
}
}
- Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?;
+
+ // Copied from from_bytes_with_ctx below but with placeholder registry
instead of
+ // default.
+ {
+ let bytes: &[u8] = &bytes;
+ let protobuf =
protobuf::LogicalExprNode::decode(bytes).map_err(|e| {
+ plan_datafusion_err!("Error decoding expr as protobuf: {e}")
+ })?;
+
+ let extension_codec = PlaceholderLogicalExtensionCodec {};
+ logical_plan::from_proto::parse_expr(
+ &protobuf,
+ &TaskContext::default(),
+ &extension_codec,
+ )
+ .map_err(|e| plan_datafusion_err!("Error parsing protobuf into
Expr: {e}"))?;
+ }
Review Comment:
This is a big ugly, but it was because since we're now requiring
`TaskContext` instead of `dyn FunctionRegistry`, there was no easy way within
`TaskContext` to just pass through arbitrary UDFs/UDAFs/UDWFs like what
`PlaceHolderRegistry` was meant to do. Instead, we achieve the same behaviour
via the codec. So I'm copying the `from_bytes_with_ctx` code into here to
customize the codec we pass in.
##########
datafusion/proto/src/logical_plan/from_proto.rs:
##########
@@ -635,13 +635,33 @@ pub fn parse_expr(
Some(data_type.try_into()?),
))),
},
+ ExprType::ScalarSubquery(scalar_subquery) => {
+ let subquery = scalar_subquery
+ .subquery
+ .as_ref()
+ .ok_or_else(|| Error::required("subquery"))?;
+ Ok(Expr::ScalarSubquery(Subquery {
+ subquery: Arc::new(subquery.try_into_logical_plan(ctx,
codec)?),
+ outer_ref_columns: parse_exprs(
+ &scalar_subquery.outer_ref_columns,
+ ctx,
+ codec,
+ )?,
+ spans: Spans::new(),
+ }))
+ }
+ ExprType::OuterReferenceColumn(OuterReferenceColumn { field, column })
=> {
+ let column = column.to_owned().ok_or_else(||
Error::required("column"))?;
+ let field = field.as_ref().required("field")?;
+ Ok(Expr::OuterReferenceColumn(Arc::new(field), column.into()))
+ }
Review Comment:
Actual implementation for deserializing
##########
datafusion/proto/src/bytes/mod.rs:
##########
@@ -66,24 +63,19 @@ pub trait Serializeable: Sized {
/// Convert `bytes` (the output of [`to_bytes`]) back into an
/// object. This will error if the serialized bytes contain any
- /// user defined functions, in which case use
- /// [`from_bytes_with_registry`]
+ /// user defined functions, in which case use [`from_bytes_with_ctx`].
///
/// [`to_bytes`]: Self::to_bytes
/// [`from_bytes_with_registry`]: Self::from_bytes_with_registry
fn from_bytes(bytes: &[u8]) -> Result<Self> {
- Self::from_bytes_with_registry(bytes, ®istry::NoRegistry {})
+ Self::from_bytes_with_ctx(bytes, &TaskContext::default())
}
- /// Convert `bytes` (the output of [`to_bytes`]) back into an
- /// object resolving user defined functions with the specified
- /// `registry`
+ /// Convert `bytes` (the output of [`to_bytes`]) back into an object
+ /// resolving user defined functions with the specified `ctx`.
///
/// [`to_bytes`]: Self::to_bytes
- fn from_bytes_with_registry(
- bytes: &[u8],
- registry: &dyn FunctionRegistry,
- ) -> Result<Self>;
+ fn from_bytes_with_ctx(bytes: &[u8], ctx: &TaskContext) -> Result<Self>;
Review Comment:
Breaking change to API here since we need a `TaskContext` because of that
`parse_expr` change; I figured it'd make sense to also rename the method to be
more accurate.
##########
datafusion/proto/src/logical_plan/to_proto.rs:
##########
@@ -576,13 +577,30 @@ pub fn serialize_expr(
qualifier: qualifier.to_owned().map(|x| x.into()),
})),
},
- Expr::ScalarSubquery(_)
- | Expr::InSubquery(_)
- | Expr::Exists { .. }
- | Expr::OuterReferenceColumn { .. } => {
- // we would need to add logical plan operators to datafusion.proto
to support this
- // see discussion in
https://github.com/apache/datafusion/issues/2565
- return Err(Error::General("Proto serialization error:
Expr::ScalarSubquery(_) | Expr::InSubquery(_) | Expr::Exists { .. } |
Exp:OuterReferenceColumn not supported".to_string()));
+ Expr::ScalarSubquery(Subquery {
+ subquery,
+ outer_ref_columns,
+ spans: _,
+ }) => protobuf::LogicalExprNode {
+ expr_type: Some(ExprType::ScalarSubquery(Box::new(
+ protobuf::ScalarSubquery {
+ subquery: Some(Box::new(
+
protobuf::LogicalPlanNode::try_from_logical_plan(subquery, codec)
+ .map_err(|e| Error::General(format!("Proto
serialization error: Failed to serialize Scalar Subquery: {e}")))?,
+ )),
+ outer_ref_columns: serialize_exprs(outer_ref_columns,
codec)?,
+ },
+ ))),
+ },
+ Expr::OuterReferenceColumn(field, column) => {
+ protobuf::LogicalExprNode {
+ expr_type:
Some(ExprType::OuterReferenceColumn(OuterReferenceColumn{
+ field: Some(field.as_ref().try_into()?), column:
Some(column.into())
+ }))
+ }
+ }
+ Expr::InSubquery(_) | Expr::Exists { .. } => {
+ return Err(Error::NotImplemented("Proto serialization error:
Expr::InSubquery(_) | Expr::Exists { .. } not supported".to_string()));
Review Comment:
Actual implementation for serializing
##########
datafusion/proto/src/bytes/mod.rs:
##########
@@ -66,24 +63,19 @@ pub trait Serializeable: Sized {
/// Convert `bytes` (the output of [`to_bytes`]) back into an
/// object. This will error if the serialized bytes contain any
- /// user defined functions, in which case use
- /// [`from_bytes_with_registry`]
+ /// user defined functions, in which case use [`from_bytes_with_ctx`].
///
/// [`to_bytes`]: Self::to_bytes
/// [`from_bytes_with_registry`]: Self::from_bytes_with_registry
fn from_bytes(bytes: &[u8]) -> Result<Self> {
- Self::from_bytes_with_registry(bytes, ®istry::NoRegistry {})
+ Self::from_bytes_with_ctx(bytes, &TaskContext::default())
Review Comment:
Default `TaskContext` will have an empty function registry just like
`NoRegistry` so this is equivalent; I also deleted `NoRegistry` entirely as it
wasn't used anywhere else after it was removed here.
##########
datafusion/proto/src/logical_plan/mod.rs:
##########
@@ -1206,10 +1206,10 @@ impl AsLogicalPlan for LogicalPlanNode {
logical_plan_type:
Some(LogicalPlanType::Selection(Box::new(
protobuf::SelectionNode {
input: Some(Box::new(input)),
- expr: Some(serialize_expr(
+ expr: Some(Box::new(serialize_expr(
Review Comment:
My changes to the `datafusion.proto` seems to also affect some other nodes
during the generation of the proto code, probably because of the enum becoming
too big so it now boxes seemingly unrelated nodes.
##########
datafusion/proto/src/logical_plan/from_proto.rs:
##########
@@ -254,7 +257,7 @@ impl From<protobuf::NullTreatment> for NullTreatment {
pub fn parse_expr(
proto: &protobuf::LogicalExprNode,
- registry: &dyn FunctionRegistry,
+ ctx: &TaskContext,
codec: &dyn LogicalExtensionCodec,
) -> Result<Expr, Error> {
Review Comment:
Since `Expr::ScalarSubquery` essentially contains a `LogicalPlan`, in order
to parse this logical plan we need to use
`LogicalPlanNode::try_into_logical_plan` which needs a `TaskContext`. So had to
expand this API to required a `TaskContext` instead of simply `dyn
FunctionRegistry` (which `TaskContext` implements). This propagates in many
other places.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]