alamb commented on PR #18535: URL: https://github.com/apache/datafusion/pull/18535#issuecomment-3593552514
> @alamb If you don't feel comfortable reviewing the PR do you have someone in mind that might be comfortable with it? I don't have any suggestions at this time. I am mostly lacking the focus time I need to think about how this API will fit into the larger story. Specifically finding time to review a 4700 line PR in detail is very hard for me. > If not, do you have any suggestions on how I can get the PR over the finish line? See below > Thanks @alamb! Let me clarify This is super helpful > This PR makes LATERAL work specifically for table functions. The problem is that the current `TableFunctionImpl` trait only accepts constant expressions at planning time: > > ```rust > fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> > ``` > > That signature can't support LATERAL where the arguments are column references that vary per input row. I agree the current signature is lacking However, I think it is important to make a single API for TableFunctions that can handle both cases: 1. All arguments are constants (what is handled today) 2. The arguments are actually tables (what this PR supports) > > This PR introduces a new `BatchedTableFunctionImpl` trait that receives already-evaluated arrays: Things I worry about in this PR: 1. It assumes the entire input will be buffered in memory, which is different than most of the rest of DataFusion which is "streaming" (operates on a batch at a time) 2. A second trait that is very similar to `TableFunctionImpl` and all the resulting boiler plate / API surface (eg. registration functions, etc) > > ```rust > async fn invoke_batch(&self, args: &[ArrayRef]) -> Result<Stream<Chunk>> > ``` > > The key idea is to process multiple rows in a single function call instead of calling the function once per row. For example, if you have 3 input rows, instead of calling the function 3 times, you call it once with arrays of length 3: > > ```rust > invoke_batch(&[ > Int64Array([1, 5, 10]), // start values from 3 rows > Int64Array([3, 7, 12]) // end values from 3 rows > ]) > ``` > > The function returns chunks with explicit row mapping so the executor knows which output rows came from which input rows. I don't understand this execution model -- my mental model is that a table function receives an arbitrary table as input, and produces an aribtrary table as output. So in my mind this corresponds to getting a [`SendableRecordBatchStream`](https://docs.rs/datafusion/latest/datafusion/execution/type.SendableRecordBatchStream.html) as input and returning a [`SendableRecordBatchStream`](https://docs.rs/datafusion/latest/datafusion/execution/type.SendableRecordBatchStream.html) as output Since the TableFunctionImpl already returns a TableProvider, and the TableProvider::scan_with_args method already gets a `ScanArgs`: https://github.com/apache/datafusion/blob/7b45934b10ada9f1881d1a38068d55ebb3b34e05/datafusion/catalog/src/table.rs#L335-L339 It seems like it would be a much smaller / simpler API to simple add a new field to `ScanArgs`: ```rust /// Arguments for scanning a table with [`TableProvider::scan_with_args`]. #[derive(Debug, Clone, Default)] pub struct ScanArgs<'a> { filters: Option<&'a [Expr]>, projection: Option<&'a [usize]>, limit: Option<usize>, /// If invoked as a table function, contains the input stream input: Option<SendableRecordBatchStream>, // <---- new field } ``` > > #### How this compares to DuckDB > DuckDB handles LATERAL differently. Their table functions don't know anything about LATERAL - they just get called with values. The magic happens in the optimizer which tries to "decorrelate" LATERAL joins into hash joins when possible, falling back to nested loops when it can't. This is what I aspire to support in DataFusion -- generic decorrelated joins. Thus I would like to create a TableFunction API that does not need major rework if/when we support decorrelated joins For example, it seems like `LogicalPlan::LateralBatchedTableFunction` is very specialized and wouldn't be used with anything other than a LATERAL table function. For example, how would we plan a query that passed the results of one query to a table function? ``` SELECT * from my_table_func(select time, log FROM t) ``` Thank you for your patience. Let m eknow what you think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
