alamb commented on code in PR #16734: URL: https://github.com/apache/datafusion/pull/16734#discussion_r2205647117
########## datafusion/datasource/src/source.rs: ########## @@ -267,7 +269,23 @@ impl ExecutionPlan for DataSourceExec { partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream> { - self.data_source.open(partition, Arc::clone(&context)) + let stream = self.data_source.open(partition, Arc::clone(&context))?; + let batch_size = context.session_config().batch_size(); + let threshold = context.session_config().batch_split_threshold(); + if threshold > 0 && batch_size >= threshold { Review Comment: As I mentioned above, I think it would be better if we always used the `BatchSplitStream` (or inlined the logic into MemorySource) ########## datafusion/physical-plan/src/stream.rs: ########## @@ -522,6 +524,155 @@ impl Stream for ObservedStream { } } +pin_project! { + /// Stream wrapper that splits large [`RecordBatch`]es into smaller batches. + /// + /// This ensures upstream operators receive batches no larger than + /// `batch_size`, which can improve parallelism when data sources + /// generate very large batches. + /// + /// # Fields + /// + /// - `current_batch`: The batch currently being split, if any + /// - `offset`: Index of the next row to split from `current_batch`. + /// This tracks our position within the current batch being split. + /// + /// # Invariants + /// + /// - `offset` is always ≤ `current_batch.num_rows()` when `current_batch` is `Some` + /// - When `current_batch` is `None`, `offset` is always 0 + /// - `batch_size` is always > 0 +pub struct BatchSplitStream { + #[pin] + input: SendableRecordBatchStream, + schema: SchemaRef, + batch_size: usize, + metrics: SplitMetrics, + current_batch: Option<RecordBatch>, + offset: usize, + } +} + +impl BatchSplitStream { + /// Create a new [`BatchSplitStream`] + pub fn new( + input: SendableRecordBatchStream, + batch_size: usize, + metrics: SplitMetrics, + ) -> Self { + let schema = input.schema(); + Self { + input, + schema, + batch_size, + metrics, + current_batch: None, + offset: 0, + } + } + + /// Attempt to produce the next sliced batch from the current batch. + /// + /// Returns `Some(batch)` if a slice was produced, `None` if the current batch + /// is exhausted and we need to poll upstream for more data. + fn next_sliced_batch(&mut self) -> Option<Result<RecordBatch>> { + let batch = self.current_batch.take()?; + + // Wrap slicing logic in a panic-safe block Review Comment: what is the rationale for the panic safe block? I don't think this is a common pattern elsewhere in the code ########## datafusion/common/src/config.rs: ########## @@ -349,6 +349,11 @@ config_namespace! { /// metadata memory consumption pub batch_size: usize, default = 8192 + /// Minimum batch size before DataFusion will attempt to + /// split oversized record batches coming from sources. + /// Set to 0 to disable splitting entirely. + pub batch_split_threshold: usize, default = 8192 Review Comment: I can't think of any usecase where this should be something different than the existing `batch_size` Thus I suggest removing this new config and always splitting using batch_size https://github.com/apache/datafusion/blob/d66d6b9eb83a4643bc8d1ee9c2da7f0fa644c0cf/datafusion/common/src/config.rs#L350-L349 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org