Jefffrey commented on code in PR #19118:
URL: https://github.com/apache/datafusion/pull/19118#discussion_r2595901975
##########
datafusion/proto/Cargo.toml:
##########
@@ -49,6 +49,7 @@ avro = ["datafusion-datasource-avro",
"datafusion-common/avro"]
[dependencies]
arrow = { workspace = true }
+async-trait = { workspace = true }
Review Comment:
Can we make this a dev dependency only?
##########
datafusion/physical-plan/src/async_func.rs:
##########
@@ -100,6 +100,14 @@ impl AsyncFuncExec {
input.boundedness(),
))
}
+
+ pub fn async_exprs(&self) -> Vec<Arc<AsyncFuncExpr>> {
+ self.async_exprs.clone()
+ }
+
+ pub fn input(&self) -> Arc<dyn ExecutionPlan> {
+ Arc::clone(&self.input)
+ }
Review Comment:
```suggestion
pub fn async_exprs(&self) -> &[Arc<AsyncFuncExpr>] {
&self.async_exprs
}
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
```
To let the caller decide if they want to clone
##########
datafusion/proto/tests/cases/roundtrip_physical_plan.rs:
##########
@@ -2263,3 +2265,100 @@ async fn roundtrip_listing_table_with_schema_metadata()
-> Result<()> {
roundtrip_test(plan)
}
+
+#[tokio::test]
+async fn roundtrip_async_func_exec() -> Result<()> {
+ #[derive(Debug, PartialEq, Eq, Hash)]
+ struct TestAsyncUDF {
+ signature: Signature,
+ }
+
+ impl TestAsyncUDF {
+ fn new() -> Self {
+ Self {
+ signature: Signature::exact(vec![DataType::Int64],
Volatility::Volatile),
+ }
+ }
+ }
+
+ impl ScalarUDFImpl for TestAsyncUDF {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "test_async_udf"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+ Ok(DataType::Int64)
+ }
+
+ fn invoke_with_args(&self, _args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ not_impl_err!("Must call from `invoke_async_with_args`")
+ }
+ }
+
+ #[async_trait::async_trait]
+ impl AsyncScalarUDFImpl for TestAsyncUDF {
+ async fn invoke_async_with_args(
+ &self,
+ args: ScalarFunctionArgs,
+ ) -> Result<ColumnarValue> {
+ Ok(args.args[0].clone())
+ }
+ }
+
+ #[derive(Debug)]
+ struct TestAsyncUDFPhysicalCodec {}
+ impl PhysicalExtensionCodec for TestAsyncUDFPhysicalCodec {
+ fn try_decode(
+ &self,
+ _buf: &[u8],
+ _inputs: &[Arc<dyn ExecutionPlan>],
+ _ctx: &TaskContext,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ not_impl_err!(
+ "TestAsyncUDFPhysicalCodec should not be called to decode an
extension"
+ )
+ }
+
+ fn try_encode(
+ &self,
+ _node: Arc<dyn ExecutionPlan>,
+ _buf: &mut Vec<u8>,
+ ) -> Result<()> {
+ Ok(())
+ }
+
+ fn try_decode_udf(&self, name: &str, _buf: &[u8]) ->
Result<Arc<ScalarUDF>> {
+ if name == "test_async_udf" {
+ Ok(Arc::new(TestAsyncUDF::new().into()))
+ } else {
+ not_impl_err!("TestAsyncUDFPhysicalCodec unrecognized UDF
{name}")
+ }
+ }
+
+ fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) ->
Result<()> {
+ Ok(())
+ }
+ }
+
+ let ctx = SessionContext::new();
+ let async_udf = AsyncScalarUDF::new(Arc::new(TestAsyncUDF::new()));
+ ctx.register_udf(async_udf.into_scalar_udf());
+
+ let logical_plan = ctx
+ .state()
+ .create_logical_plan("select test_async_udf(1)")
+ .await?;
+ let physical_plan = ctx.state().create_physical_plan(&logical_plan).await?;
+ let codec = TestAsyncUDFPhysicalCodec {};
+ roundtrip_test_and_return(physical_plan, &ctx, &codec)?;
Review Comment:
```suggestion
let plan = ctx
.sql("select test_async_udf(1)")
.await?
.create_physical_plan()
.await?;
roundtrip_test_with_context(plan, &ctx)?;
```
Simplify the test a little
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]