Copilot commented on code in PR #1694:
URL: https://github.com/apache/auron/pull/1694#discussion_r2584104503
##########
native-engine/datafusion-ext-plans/src/common/execution_context.rs:
##########
@@ -141,6 +141,89 @@ impl ExecutionContext {
.counter(name.to_owned(), self.partition_id)
}
+ pub fn split_with_default_batch_size(
+ self: &Arc<Self>,
+ input: SendableRecordBatchStream,
+ ) -> SendableRecordBatchStream {
Review Comment:
The `split_with_default_batch_size` method lacks documentation. Consider
adding a doc comment that explains:
- The purpose of this method (splitting large batches into smaller ones)
- The behavior when batches are already smaller than the target size
- The target batch size used (from `batch_size()`)
- The relationship with `coalesce_with_default_batch_size`
Example:
```rust
/// Splits large record batches into smaller batches with sizes not exceeding
/// the default batch size. Batches smaller than or equal to the target size
/// are passed through unchanged. Empty batches are filtered out.
///
/// This is typically used in combination with
`coalesce_with_default_batch_size`
/// to normalize batch sizes in a stream.
```
##########
native-engine/datafusion-ext-plans/src/sort_merge_join_exec.rs:
##########
@@ -205,7 +205,8 @@ impl SortMergeJoinExec {
.sub_duration(poll_time.duration());
})
});
- Ok(exec_ctx.coalesce_with_default_batch_size(output))
+ Ok(exec_ctx
+
.coalesce_with_default_batch_size(exec_ctx.split_with_default_batch_size(output)))
Review Comment:
[nitpick] The composition
`coalesce_with_default_batch_size(split_with_default_batch_size(output))` may
create unnecessary overhead. After splitting large batches to the target size,
the coalesce operation will immediately try to merge them back if they're too
small (less than 1/4 of batch_size based on line 275 of execution_context.rs).
Consider whether both operations are needed here, or if just
`split_with_default_batch_size` would suffice for the sort merge join output.
The coalesce operation is typically used for small batches from sources like
filters or unions, but after splitting, batches should already be close to the
target size.
```suggestion
Ok(exec_ctx.split_with_default_batch_size(output))
```
##########
native-engine/datafusion-ext-plans/src/common/execution_context.rs:
##########
@@ -141,6 +141,89 @@ impl ExecutionContext {
.counter(name.to_owned(), self.partition_id)
}
+ pub fn split_with_default_batch_size(
+ self: &Arc<Self>,
+ input: SendableRecordBatchStream,
+ ) -> SendableRecordBatchStream {
+ struct SplitLargeBatchStream {
+ input: SendableRecordBatchStream,
+ current_batch: Option<RecordBatch>,
+ current_offset: usize,
+ }
+
+ impl SplitLargeBatchStream {
+ fn split_next_chunk(&mut self) -> Option<RecordBatch> {
+ let batch = self.current_batch.as_ref()?;
+ let target_batch_size = batch_size();
+ let num_rows = batch.num_rows();
+
+ if self.current_offset >= num_rows {
+ self.current_batch = None;
+ return None;
+ }
+
+ let chunk_size = std::cmp::min(target_batch_size, num_rows -
self.current_offset);
+ let chunk = batch.slice(self.current_offset, chunk_size);
+ self.current_offset += chunk_size;
+
+ if self.current_offset >= num_rows {
+ self.current_batch = None;
+ }
+
+ Some(chunk)
+ }
+ }
+
+ impl RecordBatchStream for SplitLargeBatchStream {
+ fn schema(&self) -> SchemaRef {
+ self.input.schema()
+ }
+ }
+
+ impl Stream for SplitLargeBatchStream {
+ type Item = Result<RecordBatch>;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) ->
Poll<Option<Self::Item>> {
+ loop {
+ if let Some(chunk) = self.split_next_chunk() {
+ return Poll::Ready(Some(Ok(chunk)));
+ }
+
+ match ready!(self.input.as_mut().poll_next_unpin(cx)) {
+ Some(Ok(batch)) => {
+ if batch.is_empty() {
+ continue;
+ }
+
+ let target_batch_size = batch_size();
+ if target_batch_size == 0 {
+ return
Poll::Ready(Some(Err(DataFusionError::Internal(
+ "Invalid batch size: 0".to_string(),
+ ))));
+ }
+
+ let num_rows = batch.num_rows();
+ if num_rows <= target_batch_size {
+ return Poll::Ready(Some(Ok(batch)));
+ } else {
+ self.current_batch = Some(batch);
+ self.current_offset = 0;
+ }
+ }
+ Some(Err(e)) => return Poll::Ready(Some(Err(e))),
+ None => return Poll::Ready(None),
+ }
+ }
+ }
+ }
+
+ Box::pin(SplitLargeBatchStream {
+ input,
+ current_batch: None,
+ current_offset: 0,
+ })
+ }
Review Comment:
The new `split_with_default_batch_size` method lacks test coverage. Consider
adding unit tests to verify:
- Splitting large batches (e.g., 10000 rows) into multiple smaller batches
- Passing through batches already at or below target size
- Handling empty batches
- Error handling for zero batch size
- Edge cases like batches with exactly target_batch_size rows
This is important to ensure the splitting logic works correctly, especially
for the error case at line 199-202.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]