mach-kernel commented on code in PR #19118:
URL: https://github.com/apache/datafusion/pull/19118#discussion_r2602675540
##########
datafusion/proto/tests/cases/roundtrip_physical_plan.rs:
##########
@@ -2263,3 +2265,100 @@ async fn roundtrip_listing_table_with_schema_metadata()
-> Result<()> {
roundtrip_test(plan)
}
+
+#[tokio::test]
+async fn roundtrip_async_func_exec() -> Result<()> {
+ #[derive(Debug, PartialEq, Eq, Hash)]
+ struct TestAsyncUDF {
+ signature: Signature,
+ }
+
+ impl TestAsyncUDF {
+ fn new() -> Self {
+ Self {
+ signature: Signature::exact(vec![DataType::Int64],
Volatility::Volatile),
+ }
+ }
+ }
+
+ impl ScalarUDFImpl for TestAsyncUDF {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "test_async_udf"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+ Ok(DataType::Int64)
+ }
+
+ fn invoke_with_args(&self, _args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ not_impl_err!("Must call from `invoke_async_with_args`")
+ }
+ }
+
+ #[async_trait::async_trait]
+ impl AsyncScalarUDFImpl for TestAsyncUDF {
+ async fn invoke_async_with_args(
+ &self,
+ args: ScalarFunctionArgs,
+ ) -> Result<ColumnarValue> {
+ Ok(args.args[0].clone())
+ }
+ }
+
+ #[derive(Debug)]
+ struct TestAsyncUDFPhysicalCodec {}
+ impl PhysicalExtensionCodec for TestAsyncUDFPhysicalCodec {
+ fn try_decode(
+ &self,
+ _buf: &[u8],
+ _inputs: &[Arc<dyn ExecutionPlan>],
+ _ctx: &TaskContext,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ not_impl_err!(
+ "TestAsyncUDFPhysicalCodec should not be called to decode an
extension"
+ )
+ }
+
+ fn try_encode(
+ &self,
+ _node: Arc<dyn ExecutionPlan>,
+ _buf: &mut Vec<u8>,
+ ) -> Result<()> {
+ Ok(())
+ }
+
+ fn try_decode_udf(&self, name: &str, _buf: &[u8]) ->
Result<Arc<ScalarUDF>> {
+ if name == "test_async_udf" {
+ Ok(Arc::new(TestAsyncUDF::new().into()))
+ } else {
+ not_impl_err!("TestAsyncUDFPhysicalCodec unrecognized UDF
{name}")
+ }
+ }
+
+ fn try_encode_udf(&self, _node: &ScalarUDF, _buf: &mut Vec<u8>) ->
Result<()> {
+ Ok(())
+ }
+ }
+
+ let ctx = SessionContext::new();
+ let async_udf = AsyncScalarUDF::new(Arc::new(TestAsyncUDF::new()));
+ ctx.register_udf(async_udf.into_scalar_udf());
+
+ let logical_plan = ctx
+ .state()
+ .create_logical_plan("select test_async_udf(1)")
+ .await?;
+ let physical_plan = ctx.state().create_physical_plan(&logical_plan).await?;
+ let codec = TestAsyncUDFPhysicalCodec {};
+ roundtrip_test_and_return(physical_plan, &ctx, &codec)?;
Review Comment:
My mistake -- I was using `roundtrip_test` without the context (which did
not have the UDF registered). Updated 😄!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]