Jefffrey commented on code in PR #18167:
URL: https://github.com/apache/datafusion/pull/18167#discussion_r2444202553
##########
datafusion/proto/src/bytes/mod.rs:
##########
@@ -104,95 +96,106 @@ impl Serializeable for Expr {
// Until the underlying prost issue (
https://github.com/tokio-rs/prost/issues/736 ) is fixed, we try to
// deserialize the data here and check for errors.
//
- // Need to provide some placeholder registry because the stream may
contain UDFs
- struct PlaceHolderRegistry;
-
- impl FunctionRegistry for PlaceHolderRegistry {
- fn udfs(&self) -> std::collections::HashSet<String> {
- std::collections::HashSet::default()
- }
-
- fn udf(&self, name: &str) ->
Result<Arc<datafusion_expr::ScalarUDF>> {
+ // Need to provide some placeholder codec because the stream may
contain UDFs
+ // (using codec since with TaskContext we can't pass through unknown
udfs; it
+ // requires registering them beforehand)
+ #[derive(Debug)]
+ struct PlaceholderLogicalExtensionCodec {}
+ impl LogicalExtensionCodec for PlaceholderLogicalExtensionCodec {
+ fn try_decode_udf(&self, name: &str, _buf: &[u8]) ->
Result<Arc<ScalarUDF>> {
Ok(Arc::new(create_udf(
name,
vec![],
- arrow::datatypes::DataType::Null,
+ DataType::Null,
Volatility::Immutable,
Arc::new(|_| unimplemented!()),
)))
}
- fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {
+ fn try_decode_udaf(
+ &self,
+ name: &str,
+ _buf: &[u8],
+ ) -> Result<Arc<AggregateUDF>> {
Ok(Arc::new(create_udaf(
name,
- vec![arrow::datatypes::DataType::Null],
- Arc::new(arrow::datatypes::DataType::Null),
+ vec![DataType::Null],
+ Arc::new(DataType::Null),
Volatility::Immutable,
Arc::new(|_| unimplemented!()),
Arc::new(vec![]),
)))
}
- fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> {
+ fn try_decode_udwf(&self, name: &str, _buf: &[u8]) ->
Result<Arc<WindowUDF>> {
Ok(Arc::new(create_udwf(
name,
- arrow::datatypes::DataType::Null,
- Arc::new(arrow::datatypes::DataType::Null),
+ DataType::Null,
+ Arc::new(DataType::Null),
Volatility::Immutable,
Arc::new(|| unimplemented!()),
)))
}
- fn register_udaf(
- &mut self,
- _udaf: Arc<AggregateUDF>,
- ) -> Result<Option<Arc<AggregateUDF>>> {
- datafusion_common::internal_err!(
- "register_udaf called in Placeholder Registry!"
- )
- }
- fn register_udf(
- &mut self,
- _udf: Arc<datafusion_expr::ScalarUDF>,
- ) -> Result<Option<Arc<datafusion_expr::ScalarUDF>>> {
- datafusion_common::internal_err!(
- "register_udf called in Placeholder Registry!"
- )
- }
- fn register_udwf(
- &mut self,
- _udaf: Arc<WindowUDF>,
- ) -> Result<Option<Arc<WindowUDF>>> {
- datafusion_common::internal_err!(
- "register_udwf called in Placeholder Registry!"
- )
+
+ fn try_decode(
+ &self,
+ _buf: &[u8],
+ _inputs: &[LogicalPlan],
+ _ctx: &TaskContext,
+ ) -> Result<Extension> {
+ unimplemented!()
}
- fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> {
- vec![]
+ fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) ->
Result<()> {
+ unimplemented!()
}
- fn udafs(&self) -> std::collections::HashSet<String> {
- std::collections::HashSet::default()
+ fn try_decode_table_provider(
+ &self,
+ _buf: &[u8],
+ _table_ref: &TableReference,
+ _schema: SchemaRef,
+ _ctx: &TaskContext,
+ ) -> Result<Arc<dyn TableProvider>> {
+ unimplemented!()
}
- fn udwfs(&self) -> std::collections::HashSet<String> {
- std::collections::HashSet::default()
+ fn try_encode_table_provider(
+ &self,
+ _table_ref: &TableReference,
+ _node: Arc<dyn TableProvider>,
+ _buf: &mut Vec<u8>,
+ ) -> Result<()> {
+ unimplemented!()
}
}
- Expr::from_bytes_with_registry(&bytes, &PlaceHolderRegistry)?;
+
+ // Copied from from_bytes_with_ctx below but with placeholder registry
instead of
+ // default.
+ {
+ let bytes: &[u8] = &bytes;
+ let protobuf =
protobuf::LogicalExprNode::decode(bytes).map_err(|e| {
+ plan_datafusion_err!("Error decoding expr as protobuf: {e}")
+ })?;
+
+ let extension_codec = PlaceholderLogicalExtensionCodec {};
+ logical_plan::from_proto::parse_expr(
+ &protobuf,
+ &TaskContext::default(),
+ &extension_codec,
+ )
+ .map_err(|e| plan_datafusion_err!("Error parsing protobuf into
Expr: {e}"))?;
+ }
Review Comment:
This is a bit ugly, but it was because since we're now requiring
`TaskContext` instead of `dyn FunctionRegistry`, there was no easy way within
`TaskContext` to just pass through arbitrary UDFs/UDAFs/UDWFs like what
`PlaceHolderRegistry` was meant to do. Instead, we achieve the same behaviour
via the codec. So I'm copying the `from_bytes_with_ctx` code into here to
customize the codec we pass in.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]