ethan-tyler opened a new issue, #19774:
URL: https://github.com/apache/datafusion/issues/19774
### Is your feature request related to a problem or challenge?
This revisits the per partition sink API originally proposed in #6339. That
issue proposed write_stream(partition: usize, ...) but the simpler
write_all(single_stream) was implemented instead.
Today, DataSinkExec requires single partition input, specifically,
required_input_distribution() returns Distribution::SinglePartition, which
causes the physical optimizer to insert a merge/coalesce. This serializes
writes even when upstream produces multiple input partitions and the sink could
handle them independently.
### Describe the solution you'd like
Add an optional partition aware sink interface. Sinks that implement it
receive multiple input partitions directly; sinks that don't continue working
unchanged.
something like this:
```
#[async_trait]
pub trait PartitionedDataSink: DataSink {
/// Write a single input partition stream. Called once per input
partition.
async fn write_partition(
&self,
partition: usize,
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64>;
/// Called once after all partitions succeed. Commit here.
async fn finish(
&self,
partition_results: Vec<u64>,
context: &Arc<TaskContext>,
) -> Result<u64>;
/// Called on error/cancel after writers stop. Rollback / cleanup here.
async fn abort(
&self,
error: DataFusionError,
context: &Arc<TaskContext>,
) -> Result<()>;
}
```
write_partition calls may run concurrently on the same sink instance;
implementations must be thread safe (or internally shard state per partition).
Capability hook (provided method on DataSink, non-breaking):
```
pub trait DataSink: DisplayAs + Debug + Send + Sync {
// ... existing methods ...
/// Returns self as a partition aware sink, if supported.
fn as_partitioned(&self) -> Option<&dyn PartitionedDataSink> { None }
}
```
Partition-aware sinks override to return Some(self).
Execution changes:
- If sink.as_partitioned().is_some(): return UnspecifiedDistribution (no
forced merge), drive write_partition for each input partition in parallel
- If all succeed: call finish() once, return single row result
- On error/cancel: stop other writers promptly, call abort(), return
coherent error
- If sink.as_partitioned().is_none(): current behavior (return
SinglePartition, auto merge)
Ordering semantics:
- Sinks requiring global ordering should continue to require single
partition input
- Per partition ordering is compatible with partition parallel execution
- Add regression coverage for ordering bugs (see #16784)
### Describe alternatives you've considered
1. Do nothing - downstream projects manage parallelism outside DataFusion
2. Require all sinks to handle multiple partitions - too disruptive
3. Separate capability model disconnected from optimizer - prefer mapping to
existing distribution/ordering enforcement
### Additional context
Related issues:
- #6339
- #6569
- #13838
- #16784
Opening as a proposal to gather feedback. Happy to start smaller (e.g., just
the trait + capability hook) if preferred.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]