bubulalabu opened a new pull request, #18535:
URL: https://github.com/apache/datafusion/pull/18535
# Batched Table Functions with LATERAL Join Support
**Note: This is a draft PR for gathering feedback on the approach. It's
working, but not polished yet**
I'm interested to hear opinions if this is the right path forward before
starting to finalize the PR
## Which issue does this PR close?
Closes #18121.
---
## The Problem
DataFusion's current table function API (`TableFunctionImpl`) requires
constant arguments at planning time and returns a `TableProvider`. This
prevents LATERAL joins where table functions need to correlate with outer query
rows.
```sql
-- Current limitation: Cannot do this
SELECT * FROM metrics m
CROSS JOIN LATERAL generate_series(m.start_time, m.end_time) AS t(ts);
```
### Current Architecture Limitations
```rust
// Current API
trait TableFunctionImpl {
fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
}
```
**Problems**:
- Planning-time evaluation: Arguments must be constants, can't reference
table columns
- No LATERAL support: Can't correlate function calls with input rows
- Separate TableProvider per call: Inefficient for batch processing
---
## Proposed Solution
Introduce a new trait that processes arrays of arguments (batched
invocation) and returns output with input row mapping for correlation:
```rust
#[async_trait]
trait BatchedTableFunctionImpl {
async fn invoke_batch(
&self,
args: &[ArrayRef], // Already-evaluated arrays
projection: Option<&[usize]>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<BatchResultStream>;
}
// Results map back to input rows for LATERAL
struct BatchResultChunk {
output: RecordBatch,
input_row_indices: Vec<u32>, // contain the indexes that correspond to
the input indexes for every output row
}
```
### Key Design Decisions
**1. Arrays as Arguments**
- Like `ScalarUDF` - receives evaluated data, not planning-time expressions
- Enables batched processing: process multiple function calls in one
`invoke_batch`
- Natural fit for LATERAL: array elements correspond to input rows
**2. Async API**
- Matches `TableProvider::scan()` async pattern
**3. Direct Pushdown Parameters**
- Matches `TableProvider::scan()` pattern
- Function gets projection/filter/limit directly in `invoke_batch()`
**3. Streaming Results**
- Returns `Stream<BatchResultChunk>` for memory-bounded execution
- Chunks include `input_row_indices` for efficient LATERAL join
implementation
---
## Architecture Overview
### Execution Flow Example: generate_series(start, end)
**Input batch**:
```
┌────┬───────┬─────┐
│ id │ start │ end │
├────┼───────┼─────┤
│ 1 │ 10 │ 12 │
│ 2 │ 20 │ 22 │
└────┴───────┴─────┘
```
**Evaluate args** → `start=[10, 20]`, `end=[12, 22]`
**invoke_batch** → Stream of chunks:
```rust
Chunk {
output: [10, 11, 12], // generated values for start=[10] and
end=[12]
input_row_indices: [0, 0, 0] // all from input row 0
}
Chunk {
output: [20, 21, 22],
input_row_indices: [1, 1, 1] // all from input row 1
}
```
**Combine** using indices → Final output:
```
┌────┬───────┬─────┬───────┐
│ id │ start │ end │ value │
├────┼───────┼─────┼───────┤
│ 1 │ 10 │ 12 │ 10 │
│ 1 │ 10 │ 12 │ 11 │
│ 1 │ 10 │ 12 │ 12 │
│ 2 │ 20 │ 22 │ 20 │
│ 2 │ 20 │ 22 │ 21 │
│ 2 │ 20 │ 22 │ 22 │
└────┴───────┴─────┴───────┘
```
---
## What's Included
### Core Components
**1. New Trait** (`datafusion/catalog/src/table.rs`)
```rust
pub trait BatchedTableFunctionImpl: Send + Sync + Debug {
fn name(&self) -> &str;
fn signature(&self) -> &Signature;
fn return_type(&self, arg_types: &[DataType]) -> Result<Schema>;
async fn invoke_batch(
&self,
args: &[ArrayRef],
projection: Option<&[usize]>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<BatchResultStream>;
}
```
**2. Logical Plan Nodes** (`datafusion/expr/src/logical_plan/plan.rs`)
- `StandaloneBatchedTableFunction` - for `SELECT * FROM func(1, 100)`
- `LateralBatchedTableFunction` - for `SELECT * FROM t LATERAL func(t.x)`
**3. Physical Executor** (`datafusion/catalog/src/batched_function/exec.rs`)
- Unified `BatchedTableFunctionExec` with two modes (Standalone/Lateral)
- Handles argument evaluation, function invocation, and result combination
**6. Optimizer Rules** (`datafusion/optimizer/src/`)
- Extended projection/filter/limit pushdown to new logical plan nodes
**7. Reference Implementation**
(`datafusion/functions-table/src/generate_series_batched.rs`)
- Complete example showing the implementation pattern
- Demonstrates limit pushdown optimization
### SQL Examples Enabled
```sql
-- Standalone: constant arguments
SELECT * FROM generate_series(1, 100);
-- LATERAL: correlate with outer table
SELECT m.metric_id, t.ts
FROM metrics m
CROSS JOIN LATERAL generate_series(m.start_time, m.end_time) AS t(ts);
-- With optimizations (limit pushdown)
SELECT * FROM generate_series(1, 1000000) LIMIT 10;
-- Only generates ~10 rows instead of 1M
```
---
## Testing
**Comprehensive test coverage**:
- Unit tests in `batched_function/exec.rs`
- Integration tests in `core/tests/batched_table_function.rs`
- SQL logic tests in `sqllogictest/test_files/lateral.slt`
- All tests pass
---
## Open Questions
### 1. Is a new trait the right approach?
This PR introduces `BatchedTableFunctionImpl` as a separate trait. An
alternative would be to use `TableProvider` wrappers.
### 2. Is the API appropriate?
```rust
async fn invoke_batch(
args: &[ArrayRef],
projection: Option<&[usize]>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<BatchResultStream>;
```
### 3. Is "Batched" the right terminology?
Current naming: `BatchedTableFunctionImpl`,
`StandaloneBatchedTableFunction`, etc.
### 4. Should we keep both APIs or eventually deprecate `TableFunctionImpl`?
---
## User-Facing Changes
**New SQL capabilities**:
```sql
-- LATERAL joins
SELECT t.id, s.value
FROM my_table t
CROSS JOIN LATERAL generate_series(t.start, t.end) AS s(value);
```
**New API for function developers**:
```rust
// Implement the trait
struct MyFunction { /* ... */ }
#[async_trait]
impl BatchedTableFunctionImpl for MyFunction {
async fn invoke_batch(/* ... */) -> Result<BatchResultStream> {
// Your implementation
}
}
// Register in session
ctx.state_ref()
.write()
.register_batched_table_function("my_func", Arc::new(MyFunction));
```
**Breaking changes**:
The PR adds two new values to the LogicalPlan enum.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]