gstvg commented on issue #22091:
URL: https://github.com/apache/datafusion/issues/22091#issuecomment-4414082077

   I can imagine 3 solutions:
   
   - Make `HigherOrderUDF::evaluate` and `LambdaArgument::evaluate` async, so 
that any async_udf within a lambda body can be evaluated before the rest of the 
body, it's output added to the lambda batch, the async_udf rewrited into a 
column, and then the whole body evaluated. All higher-order functions would be 
moved to `AsyncFuncExec` as well.
   - Add a default, blocking execution for async udf that uses lambda variables 
using 
[tokio::task::block_in_place](http://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html),
 like below, but it won't work on WASM:
   
   ```rust
       fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
           block_in_place(move || {
               Handle::try_current()
                   .map_err(|err| {
                       exec_datafusion_err!(
                           "{} can't get handle of current tokio runtime: 
{err}",
                           self.name()
                       )
                   })?
                   .block_on(async {
                       invoke_with_args(
                           self,
                           Arc::clone(&self.return_field),
                           batch,
                           Arc::clone(&self.config_options),
                       )
                       .await
                   })
           })
       }
   ```
   - Swap `HigherOrderUDF::evaluate` for the following method, so that 
`AsyncFuncExec` can handle any async udf with a lambda body for each of it's 
invocation. Higher-order functions with async_udf within it's body would be 
moved to `AsyncFuncExec`
   
   ```rust
   trait HigherOrderUDF {
       fn make_evaluator(&self, args: HigherOrderFunctionArgs) -> Box<dyn 
Evaluator>;
   }
   
   enum Step {
       /// AsyncFuncExec will evaluate the async nodes of the expr using the 
batch, append their outputs to batch and
       /// rewrite the async nodes to column ref,then evaluate the whole expr 
with it,
       /// and provide the result to the next Evaluator::next call
       Partial((RecordBatch, Arc<dyn PhysicalExpr>)),
       Complete(ColumnarValue)
   }
   
   trait Evaluator {
       fn next(&mut self, last_step_output: Option<ColumnarValue>) -> 
Result<Step>;
   }
   ``` 
   
   I'll explore the first option, but I'm open to suggestions
   cc @rluvaton @LiaCastaneda @comphead @pepijnve @martin-g 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to