alamb commented on PR #18535:
URL: https://github.com/apache/datafusion/pull/18535#issuecomment-3593552514

   > @alamb If you don't feel comfortable reviewing the PR do you have someone 
in mind that might be comfortable with it?
   
   I don't have any suggestions at this time. I am mostly lacking the focus 
time I need to think about how this API will fit into the larger story.  
Specifically finding time to review a 4700 line PR in detail is very hard for 
me. 
   
   > If not, do you have any suggestions on how I can get the PR over the 
finish line?
   
   See below
   
   
   
   > Thanks @alamb! Let me clarify
   
   This is super helpful
   
   
   > This PR makes LATERAL work specifically for table functions. The problem 
is that the current `TableFunctionImpl` trait only accepts constant expressions 
at planning time:
   > 
   > ```rust
   > fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>
   > ```
   > 
   > That signature can't support LATERAL where the arguments are column 
references that vary per input row.
   
   I agree the current signature is lacking 
   
   However, I think it is important to make a single API for TableFunctions 
that can handle both cases:
   1. All arguments are constants (what is handled today)
   2. The arguments are actually tables (what this PR supports)
   
   
   > 
   > This PR introduces a new `BatchedTableFunctionImpl` trait that receives 
already-evaluated arrays:
   
   Things I worry about in this PR:
   1. It assumes the entire input will be buffered in memory, which is 
different than most of the rest of DataFusion which is "streaming" (operates on 
a batch at a time)
   2. A second trait that is very similar to `TableFunctionImpl` and all the 
resulting boiler plate / API surface (eg. registration functions, etc)
   
   > 
   > ```rust
   > async fn invoke_batch(&self, args: &[ArrayRef]) -> Result<Stream<Chunk>>
   > ```
   > 
   > The key idea is to process multiple rows in a single function call instead 
of calling the function once per row. For example, if you have 3 input rows, 
instead of calling the function 3 times, you call it once with arrays of length 
3:
   > 
   > ```rust
   > invoke_batch(&[
   >     Int64Array([1, 5, 10]),  // start values from 3 rows
   >     Int64Array([3, 7, 12])   // end values from 3 rows
   > ])
   > ```
   > 
   > The function returns chunks with explicit row mapping so the executor 
knows which output rows came from which input rows.
   
   I don't understand this execution model -- my mental model is that a table 
function receives an arbitrary table as input, and produces an aribtrary table 
as output. 
   
   So in my mind this corresponds to getting a 
[`SendableRecordBatchStream`](https://docs.rs/datafusion/latest/datafusion/execution/type.SendableRecordBatchStream.html)
 as input and returning a 
[`SendableRecordBatchStream`](https://docs.rs/datafusion/latest/datafusion/execution/type.SendableRecordBatchStream.html)
 as output
   
   
   Since the TableFunctionImpl already returns a TableProvider, and the 
TableProvider::scan_with_args method already gets a `ScanArgs`: 
   
https://github.com/apache/datafusion/blob/7b45934b10ada9f1881d1a38068d55ebb3b34e05/datafusion/catalog/src/table.rs#L335-L339
   
   It seems like it would be a much smaller / simpler API to simple add a new 
field to `ScanArgs`:
   
   ```rust
   /// Arguments for scanning a table with [`TableProvider::scan_with_args`].
   #[derive(Debug, Clone, Default)]
   pub struct ScanArgs<'a> {
       filters: Option<&'a [Expr]>,
       projection: Option<&'a [usize]>,
       limit: Option<usize>,
       /// If invoked as a table function, contains the input stream
       input: Option<SendableRecordBatchStream>,  // <---- new field
   }
   ```
   
   > 
   > #### How this compares to DuckDB
   > DuckDB handles LATERAL differently. Their table functions don't know 
anything about LATERAL - they just get called with values. The magic happens in 
the optimizer which tries to "decorrelate" LATERAL joins into hash joins when 
possible, falling back to nested loops when it can't.
   
   This is what I aspire to support in DataFusion -- generic decorrelated 
joins. Thus I would like to create a TableFunction API that does not need major 
rework if/when we support decorrelated joins
   
   For example, it seems like `LogicalPlan::LateralBatchedTableFunction` is 
very specialized and wouldn't be used with anything other than a LATERAL table 
function. 
   
   For example, how would we plan a query that passed the results of one query 
to a table function?
   ```
   SELECT * from my_table_func(select time, log FROM t)
   ```
   
   Thank you for your patience. Let m eknow what you think
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to