bubulalabu commented on PR #18535:
URL: https://github.com/apache/datafusion/pull/18535#issuecomment-3596204994

   Thanks @alamb for the detailed feedback! This is really helpful. Let me 
address your concerns:
   
   ## On the Single API Question
   
   > However, I think it is important to make a single API for TableFunctions 
that can handle both cases:
   > 1. All arguments are constants (what is handled today)
   > 2. The arguments are actually tables (what this PR supports)
   
   I agree. The current `BatchedTableFunctionImpl` trait works for all three 
use cases:
   
   **1. Constant arguments** (what's handled today):
   ```sql
   SELECT * FROM generate_series(1, 100)
   ```
   
   **2. LATERAL joins** (row-correlated arguments):
   ```sql
   SELECT * FROM t CROSS JOIN LATERAL generate_series(t.start, t.end)
   ```
   
   **3. Table-valued parameters** (table as argument - future work):
   ```sql
   SELECT * FROM generate_series(SELECT start, end FROM t)
   ```
   
   The key idea is that the same trait works for all cases:
   
   - **Constant arguments**: The planner evaluates the constant expressions 
into a single-row batch, calls `invoke_batch` once with arrays of length 1, and 
returns the results. This is what the PR implements as "standalone batched 
table function."
   
   - **LATERAL joins**: The execution node receives batches from the input 
table, calls `invoke_batch` with column arrays, uses `input_row_indices` to 
correlate outputs to inputs, and combines them. This is fully implemented in 
the PR.
   
   - **Table-valued parameters**: The execution node would receive batches from 
the subquery, call `invoke_batch` with the subquery's column arrays, and pass 
through the results. The trait already provides all the building blocks for 
this - it's just a different execution node (future work).
   
   Same trait, different execution strategies based on the logical plan node.
   
   ## On the Execution Model
   
   > I don't understand this execution model -- my mental model is that a table 
function receives an arbitrary table as input, and produces an arbitrary table 
as output.
   >
   > So in my mind this corresponds to getting a SendableRecordBatchStream as 
input and returning a SendableRecordBatchStream as output
   
   That's what's happening. The key is where the stream boundaries are.
   
   The trait signature is:
   ```rust
   async fn invoke_batch(
       &self,
       args: &[ArrayRef],
       projection: Option<&[usize]>,
       filters: &[Expr],
       limit: Option<usize>,
   ) -> Result<Stream<BatchResultChunk>>
   ```
   
   But in the execution context:
   - **Input**: The execution node (`BatchedTableFunctionExec`) receives a 
`SendableRecordBatchStream` from its child
   - **Processing**: For each `RecordBatch` from that stream, it evaluates the 
argument expressions to get `&[ArrayRef]`, then calls `invoke_batch`
   - **Output**: The execution node returns a `SendableRecordBatchStream` that 
chains together all the result chunks
   
   So from the execution plan perspective, it's exactly 
`SendableRecordBatchStream` → `SendableRecordBatchStream`.
   
   The reason the trait operates on `&[ArrayRef]` instead of 
`SendableRecordBatchStream` is to match DataFusion's existing function patterns:
   - `ScalarUDFImpl`: `&[ArrayRef]` → `ArrayRef`
   - `AggregateUDFImpl`: `&[ArrayRef]` → scalar
   - `WindowUDFImpl`: `&[ArrayRef]` (partition) → `ArrayRef`
   - **`BatchedTableFunctionImpl`**: `&[ArrayRef]` → `Stream<BatchResultChunk>`
   
   All of these receive already-evaluated array arguments. The execution nodes 
handle the streaming/batching.
   
   ## On Buffering Concerns
   
   > It assumes the entire input will be buffered in memory, which is different 
than most of the rest of DataFusion which is "streaming" (operates on a batch 
at a time)
   
   I wanted to clarify this - the implementation is fully streaming using 
`SendableRecordBatchStream` throughout:
   
   ```rust
   impl ExecutionPlan for BatchedTableFunctionExec {
       fn execute(&self, partition: usize, context: Arc<TaskContext>)
           -> Result<SendableRecordBatchStream> {
   
           // Get streaming input
           let input_stream = self.input.execute(partition, context)?;
   
           // Return wrapped stream
           Ok(Box::pin(BatchedTableFunctionStream {
               input_stream,  // SendableRecordBatchStream
               // ...
           }))
       }
   }
   ```
   
   **Execution flow:**
   1. `input.execute()` returns `SendableRecordBatchStream`
   2. `BatchedTableFunctionStream` polls this stream for batches
   3. For each batch: evaluate args, call `invoke_batch`, stream results
   4. No buffering - processes one RecordBatch at a time
   
   The "batching" refers to DataFusion's natural RecordBatch size, not 
buffering the entire table.
   
   ## On the ScanArgs Extension
   
   > It seems like it would be a much smaller / simpler API to simple add a new 
field to ScanArgs
   
   I explored this approach, but there's a fundamental timing issue:
   
   **Problem**: `TableProvider::scan()` is called at planning time, but the 
input stream only exists at execution time.
   
   ```rust
   // During planning
   let provider = table_function.call(args)?;  // Creates TableProvider
   let exec_plan = provider.scan(args)?;      // Returns ExecutionPlan
                                              // But input stream doesn't exist 
yet!
   
   // During execution
   let stream = exec_plan.execute()?;         // Now we have data
   ```
   
   You'd need `scan()` to return an `ExecutionPlan` that "will receive a stream 
later", which breaks the TableProvider abstraction.
   
   Additionally, `TableProvider` has no concept of row correlation (which 
output came from which input), which is essential for LATERAL semantics. The 
join would need to happen externally, but then you've essentially reimplemented 
what `BatchedTableFunctionExec` already does.
   
   ## On API Surface
   
   > A second trait that is very similar to TableFunctionImpl and all the 
resulting boiler plate / API surface (eg. registration functions, etc)
   
   I totally understand the concern about having two similar traits. The 
current `TableFunctionImpl` is a planning-time TableProvider generator:
   
   ```rust
   trait TableFunctionImpl {
       fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>>
   }
   ```
   
   It receives constant expressions at planning time and returns a 
TableProvider.
   
   `BatchedTableFunctionImpl` is a superset - it handles everything the current 
trait does (constant arguments) plus LATERAL joins.
   
   **On the naming:**
   
   I used "Batched" only because `TableFunctionImpl` is already taken. The 
"batched" refers to processing arrays (batches of values) at execution time, 
matching other UDF traits. **Ideally this would just be called 
`TableFunctionImpl`**.
   
   ## On Decorrelation and Future Compatibility
   
   > This is what I aspire to support in DataFusion -- generic decorrelated 
joins. Thus I would like to create a TableFunction API that does not need major 
rework if/when we support decorrelated joins
   >
   > For example, it seems like LogicalPlan::LateralBatchedTableFunction is 
very specialized and wouldn't be used with anything other than a LATERAL table 
function.
   
   The API doesn't prevent future decorrelation optimizations. The trait is 
designed to support it:
   
   - **Stateless**: The `invoke_batch` method has no hidden state - it's a pure 
function that processes `&[ArrayRef]` arguments
   - **Determinism-aware**: The `signature()` method returns a `Signature` with 
volatility (Immutable/Stable/Volatile), allowing the optimizer to know if 
results can be cached or if duplicate inputs produce duplicate outputs
   - **Optimizer-friendly**: The explicit `LateralBatchedTableFunction` logical 
node tells the optimizer this is a correlated operation
   
   DuckDB deduplicates correlated arguments and uses hash joins to reconnect 
results. A similar approach could work here:
   
   ```
   Physical planning creates:
       HashAggregate(input, group_by=[correlated_cols])  // Deduplicate
           ↓
       BatchedTableFunctionExec(deduplicated, func)
           ↓
       HashJoin(original_input, results, on=[correlated_cols])
   ```
   
   For example, if a function has `Signature::volatility = 
Volatility::Immutable`, the optimizer knows that identical inputs always 
produce identical outputs, so it can safely deduplicate the arguments.
   
   The `LateralBatchedTableFunction` logical node is specialized for LATERAL, 
but that's intentional - it tells the optimizer "this is a correlated 
operation". When you write:
   
   ```sql
   SELECT * FROM t CROSS JOIN LATERAL my_func(t.x, t.y)
   ```
   
   The planner needs to know this is correlated so it can choose the right 
execution strategy (nested loop, decorrelation, etc.). This is similar to how 
DuckDB has specialized logical nodes for correlated operations.
   
   ## On Table-Valued Parameters
   
   > For example, how would we plan a query that passed the results of one 
query to a table function?
   >
   > SELECT * from my_table_func(select time, log FROM t)
   
   This would use a different logical node but the same trait:
   
   **For LATERAL joins:**
   - Logical node: `LateralBatchedTableFunction(input, func, args=[col refs])`
   - Execution node: `BatchedTableFunctionExec`
   - Uses `input_row_indices` to correlate: which output rows came from which 
input rows
   - Combines input columns + output columns
   
   **For table-valued parameters:**
   - Logical node: `TableValuedFunction(subquery, func)` (future work)
   - Execution node: Same `BatchedTableFunctionExec` (or simplified variant)
   - Subquery executes, produces `SendableRecordBatchStream`
   - Calls `invoke_batch` with subquery column values
   - Ignores `input_row_indices` (or treats as identity mapping)
   - Just passes through the transformed stream
   
   Same trait, different logical plan nodes. The trait is the execution-time 
API, the logical nodes express the semantics.
   
   ## The Unified Trait
   
   Here's the core trait definition:
   
   ```rust
   pub trait BatchedTableFunctionImpl: Send + Sync + Debug {
       fn name(&self) -> &str;
       fn signature(&self) -> &Signature;  // Type checking & volatility
       fn return_type(&self, arg_types: &[DataType]) -> Result<Schema>;
   
       fn supports_filters_pushdown(&self, filters: &[&Expr])
           -> Result<Vec<TableProviderFilterPushDown>>;
   
       async fn invoke_batch(
           &self,
           args: &[ArrayRef],          // N invocations as arrays of length N
           projection: Option<&[usize]>,
           filters: &[Expr],
           limit: Option<usize>,
       ) -> Result<BatchResultStream>;  // Stream of BatchResultChunk
   }
   
   pub struct BatchResultChunk {
       pub output: RecordBatch,           // Generated rows
       pub input_row_indices: Vec<u32>,  // Maps each output row to input row 
[0..N)
   }
   ```
   
   Key methods:
   - **`signature()`**: Defines argument types and volatility for optimization 
(see decorrelation section above)
   - **`return_type()`**: Infers output schema from input types during logical 
planning
   - **`supports_filters_pushdown()`**: Allows pushing filters into the 
function for efficient execution
   - **`invoke_batch()`**: Processes a batch of N invocations, returns a stream 
of `BatchResultChunk`s where `input_row_indices` maps output rows back to their 
source input rows
   
   Example implementation for `generate_series`:
   
   ```rust
   impl BatchedTableFunctionImpl for GenerateSeries {
       fn signature(&self) -> &Signature {
           // Volatility::Immutable means same inputs → same outputs
           // Enables decorrelation optimization
           Signature::new(
               TypeSignature::Exact(vec![Int64, Int64]),
               Volatility::Immutable
           )
       }
   
       async fn invoke_batch(&self, args: &[ArrayRef], ...) -> 
Result<BatchResultStream> {
           let starts = as_int64_array(&args[0])?;  // N start values
           let ends = as_int64_array(&args[1])?;    // N end values
   
           // Process N invocations (where N = args[0].len())
           // Same implementation works for:
           // - Constant arguments: N=1, single invocation
           // - LATERAL joins: N=batch_size from input table
           // - Table-valued parameters: N=rows from subquery result
       }
   }
   ```
   
   ## Comparison with DuckDB's Approach
   
   **DuckDB's implicit correlation:**
   - Table functions maintain internal state tracking which input row they're 
processing
   - Processes input rows sequentially
   - Correlation is implicit in execution order
   - State is in `LocalTableFunctionState`
   
   **This PR's explicit correlation:**
   - Table functions are stateless - no hidden state
   - Can process input rows in any order
   - Correlation is explicit via `input_row_indices`
   - Simpler to reason about and debug
   
   The explicit approach offers more flexibility for future optimizations like 
parallel invocation or reordering, whereas DuckDB's stateful model requires 
sequential processing.
   
   Additionally, the batched invocation model gives implementations more 
freedom:
   - They can decide to process inputs in parallel (e.g., spawn multiple 
threads to handle different ranges)
   - They can apply context-specific optimizations (e.g., vectorized 
processing, SIMD operations across the batch)
   - They can amortize setup costs across multiple invocations (e.g., single 
connection pool checkout for N database queries)
   
   That said, DuckDB's approach is proven in production and there may be 
advantages I'm not seeing!
   
   ## Performance Benchmarks
   
   I ran some benchmarks comparing this implementation against DuckDB v1.3.2 to 
get a sense of how the batched approach performs for LATERAL joins:
   
   **LATERAL Join Performance:**
   
   | LATERAL Calls | DuckDB Time (s) | DataFusion Time (s) | Speedup |
   |---------------|-----------------|---------------------|---------|
   | 100,000       | 0.06            | 0.01                | 6.0x    |
   | 500,000       | 0.16            | 0.01                | 16.0x   |
   | 1,000,000     | 0.32            | 0.02                | 16.0x   |
   | 2,000,000     | 0.55            | 0.02                | 27.5x   |
   | 5,000,000     | 1.33            | 0.03                | 44.3x   |
   | 10,000,000    | 2.64            | 0.06                | 44.0x   |
   | 20,000,000    | 5.24            | 0.10                | 52.4x   |
   
   Key findings:
   - The batched processing scales well - the speedup grows from 6x at 100K 
calls to 52x at 20M calls
   - It handles large numbers of correlated calls efficiently (20 million calls 
in 0.10 seconds)
   - The performance advantage grows with scale
   
   Note: These are microbenchmarks using `generate_series`.
   
   The results suggest the explicit `input_row_indices` approach is more 
efficient than DuckDB's stateful iteration for this use case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to