bubulalabu opened a new pull request, #18535:
URL: https://github.com/apache/datafusion/pull/18535

   # Batched Table Functions with LATERAL Join Support
   
   **Note: This is a draft PR for gathering feedback on the approach. It's 
working, but not polished yet**
   
   I'm interested to hear opinions if this is the right path forward before 
starting to finalize the PR
   
   ## Which issue does this PR close?
   
   Closes #18121.
   
   ---
   
   ## The Problem
   
   DataFusion's current table function API (`TableFunctionImpl`) requires 
constant arguments at planning time and returns a `TableProvider`. This 
prevents LATERAL joins where table functions need to correlate with outer query 
rows.
   
   ```sql
   -- Current limitation: Cannot do this
   SELECT * FROM metrics m
   CROSS JOIN LATERAL generate_series(m.start_time, m.end_time) AS t(ts);
   ```
   
   ### Current Architecture Limitations
   
   ```rust
   // Current API
   trait TableFunctionImpl {
       fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>;
   }
   ```
   
   **Problems**:
   - Planning-time evaluation: Arguments must be constants, can't reference 
table columns
   - No LATERAL support: Can't correlate function calls with input rows
   - Separate TableProvider per call: Inefficient for batch processing
   
   ---
   
   ## Proposed Solution
   
   Introduce a new trait that processes arrays of arguments (batched 
invocation) and returns output with input row mapping for correlation:
   
   ```rust
   #[async_trait]
   trait BatchedTableFunctionImpl {
       async fn invoke_batch(
           &self,
           args: &[ArrayRef],            // Already-evaluated arrays
           projection: Option<&[usize]>, 
           filters: &[Expr],
           limit: Option<usize>,
       ) -> Result<BatchResultStream>;
   }
   
   // Results map back to input rows for LATERAL
   struct BatchResultChunk {
       output: RecordBatch,
       input_row_indices: Vec<u32>,  // contain the indexes that correspond to 
the input indexes for every output row 
   }
   ```
   
   ### Key Design Decisions
   
   **1. Arrays as Arguments**
   - Like `ScalarUDF` - receives evaluated data, not planning-time expressions
   - Enables batched processing: process multiple function calls in one 
`invoke_batch`
   - Natural fit for LATERAL: array elements correspond to input rows
   
   **2. Async API**
   - Matches `TableProvider::scan()` async pattern
   
   **3. Direct Pushdown Parameters**
   - Matches `TableProvider::scan()` pattern
   - Function gets projection/filter/limit directly in `invoke_batch()`
   
   **3. Streaming Results**
   - Returns `Stream<BatchResultChunk>` for memory-bounded execution
   - Chunks include `input_row_indices` for efficient LATERAL join 
implementation
   
   ---
   
   ## Architecture Overview
   
   ### Execution Flow Example: generate_series(start, end)
   
   **Input batch**:
   ```
   ┌────┬───────┬─────┐
   │ id │ start │ end │
   ├────┼───────┼─────┤
   │ 1  │ 10    │ 12  │
   │ 2  │ 20    │ 22  │
   └────┴───────┴─────┘
   ```
   
   **Evaluate args** → `start=[10, 20]`, `end=[12, 22]`
   
   **invoke_batch** → Stream of chunks:
   ```rust
   Chunk {
       output: [10, 11, 12],     // generated values for start=[10] and 
end=[12] 
       input_row_indices: [0, 0, 0]  // all from input row 0
   }
   Chunk {
       output: [20, 21, 22],
       input_row_indices: [1, 1, 1]  // all from input row 1
   }
   ```
   
   **Combine** using indices → Final output:
   ```
   ┌────┬───────┬─────┬───────┐
   │ id │ start │ end │ value │
   ├────┼───────┼─────┼───────┤
   │ 1  │ 10    │ 12  │ 10    │
   │ 1  │ 10    │ 12  │ 11    │
   │ 1  │ 10    │ 12  │ 12    │
   │ 2  │ 20    │ 22  │ 20    │
   │ 2  │ 20    │ 22  │ 21    │
   │ 2  │ 20    │ 22  │ 22    │
   └────┴───────┴─────┴───────┘
   ```
   
   ---
   
   ## What's Included
   
   ### Core Components
   
   **1. New Trait** (`datafusion/catalog/src/table.rs`)
   ```rust
   pub trait BatchedTableFunctionImpl: Send + Sync + Debug {
       fn name(&self) -> &str;
       fn signature(&self) -> &Signature;
       fn return_type(&self, arg_types: &[DataType]) -> Result<Schema>;
   
       async fn invoke_batch(
           &self,
           args: &[ArrayRef],
           projection: Option<&[usize]>,
           filters: &[Expr],
           limit: Option<usize>,
       ) -> Result<BatchResultStream>;
   }
   ```
   
   **2. Logical Plan Nodes** (`datafusion/expr/src/logical_plan/plan.rs`)
   - `StandaloneBatchedTableFunction` - for `SELECT * FROM func(1, 100)`
   - `LateralBatchedTableFunction` - for `SELECT * FROM t LATERAL func(t.x)`
   
   **3. Physical Executor** (`datafusion/catalog/src/batched_function/exec.rs`)
   - Unified `BatchedTableFunctionExec` with two modes (Standalone/Lateral)
   - Handles argument evaluation, function invocation, and result combination
   
   **6. Optimizer Rules** (`datafusion/optimizer/src/`)
   - Extended projection/filter/limit pushdown to new logical plan nodes
   
   **7. Reference Implementation** 
(`datafusion/functions-table/src/generate_series_batched.rs`)
   - Complete example showing the implementation pattern
   - Demonstrates limit pushdown optimization
   
   ### SQL Examples Enabled
   
   ```sql
   -- Standalone: constant arguments
   SELECT * FROM generate_series(1, 100);
   
   -- LATERAL: correlate with outer table
   SELECT m.metric_id, t.ts
   FROM metrics m
   CROSS JOIN LATERAL generate_series(m.start_time, m.end_time) AS t(ts);
   
   -- With optimizations (limit pushdown)
   SELECT * FROM generate_series(1, 1000000) LIMIT 10;
   -- Only generates ~10 rows instead of 1M
   ```
   
   ---
   
   ## Testing
   
   **Comprehensive test coverage**:
   - Unit tests in `batched_function/exec.rs`
   - Integration tests in `core/tests/batched_table_function.rs`
   - SQL logic tests in `sqllogictest/test_files/lateral.slt`
   - All tests pass
   
   ---
   
   ## Open Questions
   
   ### 1. Is a new trait the right approach?
   
   This PR introduces `BatchedTableFunctionImpl` as a separate trait. An 
alternative would be to use `TableProvider` wrappers.
   
   ### 2. Is the API appropriate?
   
   ```rust
   async fn invoke_batch(
       args: &[ArrayRef],
       projection: Option<&[usize]>,
       filters: &[Expr],
       limit: Option<usize>,
   ) -> Result<BatchResultStream>;
   ```
   
   ### 3. Is "Batched" the right terminology?
   
   Current naming: `BatchedTableFunctionImpl`, 
`StandaloneBatchedTableFunction`, etc.
   
   ### 4. Should we keep both APIs or eventually deprecate `TableFunctionImpl`?
   
   ---
   
   ## User-Facing Changes
   
   **New SQL capabilities**:
   ```sql
   -- LATERAL joins
   SELECT t.id, s.value
   FROM my_table t
   CROSS JOIN LATERAL generate_series(t.start, t.end) AS s(value);
   ```
   
   **New API for function developers**:
   ```rust
   // Implement the trait
   struct MyFunction { /* ... */ }
   
   #[async_trait]
   impl BatchedTableFunctionImpl for MyFunction {
       async fn invoke_batch(/* ... */) -> Result<BatchResultStream> {
           // Your implementation
       }
   }
   
   // Register in session
   ctx.state_ref()
       .write()
       .register_batched_table_function("my_func", Arc::new(MyFunction));
   ```
   
   **Breaking changes**: 
   
   The PR adds two new values to the LogicalPlan enum.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to