ethan-tyler opened a new issue, #19774:
URL: https://github.com/apache/datafusion/issues/19774

   ### Is your feature request related to a problem or challenge?
   
   This revisits the per partition sink API originally proposed in #6339. That 
issue proposed write_stream(partition: usize, ...) but the simpler 
write_all(single_stream) was implemented instead.
   
   Today, DataSinkExec requires single partition input, specifically, 
required_input_distribution() returns Distribution::SinglePartition, which 
causes the physical optimizer to insert a merge/coalesce. This serializes 
writes even when upstream produces multiple input partitions and the sink could 
handle them independently.
   
   ### Describe the solution you'd like
   
   Add an optional partition aware sink interface. Sinks that implement it 
receive multiple input partitions directly; sinks that don't continue working 
unchanged.
   
   something like this:
   ```
   #[async_trait]
   pub trait PartitionedDataSink: DataSink {
       /// Write a single input partition stream. Called once per input 
partition.
       async fn write_partition(
           &self,
           partition: usize,
           data: SendableRecordBatchStream,
           context: &Arc<TaskContext>,
       ) -> Result<u64>;
   
       /// Called once after all partitions succeed. Commit here.
       async fn finish(
           &self,
           partition_results: Vec<u64>,
           context: &Arc<TaskContext>,
       ) -> Result<u64>;
   
       /// Called on error/cancel after writers stop. Rollback / cleanup here.
       async fn abort(
           &self,
           error: DataFusionError,
           context: &Arc<TaskContext>,
       ) -> Result<()>;
   }
   ```
   
   write_partition calls may run concurrently on the same sink instance; 
implementations must be thread safe (or internally shard state per partition).
   
   Capability hook (provided method on DataSink, non-breaking):
   ```
   pub trait DataSink: DisplayAs + Debug + Send + Sync {
       // ... existing methods ...
   
       /// Returns self as a partition aware sink, if supported.
       fn as_partitioned(&self) -> Option<&dyn PartitionedDataSink> { None }
   }
   ```
   Partition-aware sinks override to return Some(self).
   
   Execution changes:
   - If sink.as_partitioned().is_some(): return UnspecifiedDistribution (no 
forced merge), drive write_partition for each input partition in parallel
   - If all succeed: call finish() once, return single row result
   - On error/cancel: stop other writers promptly, call abort(), return 
coherent error
   - If sink.as_partitioned().is_none(): current behavior (return 
SinglePartition, auto merge)
   
   Ordering semantics:
   - Sinks requiring global ordering should continue to require single 
partition input
   - Per partition ordering is compatible with partition parallel execution
   - Add regression coverage for ordering bugs (see #16784)
   
   ### Describe alternatives you've considered
   
   1. Do nothing - downstream projects manage parallelism outside DataFusion
   2. Require all sinks to handle multiple partitions - too disruptive
   3. Separate capability model disconnected from optimizer - prefer mapping to 
existing distribution/ordering enforcement
   
   ### Additional context
   
   Related issues:
   - #6339 
   - #6569 
   - #13838
   - #16784
   
   Opening as a proposal to gather feedback. Happy to start smaller (e.g., just 
the trait + capability hook) if preferred.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to