kosiew commented on code in PR #22878:
URL: https://github.com/apache/datafusion/pull/22878#discussion_r3425448079
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -585,17 +584,25 @@ impl ExternalSorter {
/// ```
fn in_mem_sort_stream(
&mut self,
- metrics: BaselineMetrics,
+ is_output_stream: bool,
) -> Result<SendableRecordBatchStream> {
if self.in_mem_batches.is_empty() {
- return Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
- &self.schema,
- ))));
+ let empty_stream =
+
Box::pin(EmptyRecordBatchStream::new(Arc::clone(&self.schema)));
+ return Ok(if is_output_stream {
Review Comment:
Nice fix. I noticed the three `if is_output_stream {
ObservedStream::new(...) } else { ... }` blocks are all preserving the same
invariant.
Could be worth pulling this into a small helper, maybe something like
`self.observe_if_output(stream, is_output_stream)`, so intermediate runs keep
their own metrics and final output paths consistently get wrapped.
##########
datafusion/physical-plan/src/sorts/multi_level_merge.rs:
##########
@@ -225,25 +225,46 @@ impl MultiLevelMergeBuilder {
) -> Result<SendableRecordBatchStream> {
match (self.sorted_spill_files.len(), self.sorted_streams.len()) {
// No data so empty batch
- (0, 0) => Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
- &self.schema,
- )))),
+ (0, 0) => {
+ let empty_stream =
+
Box::pin(EmptyRecordBatchStream::new(Arc::clone(&self.schema)));
+ Ok(Box::pin(ObservedStream::new(
Review Comment:
Same small cleanup idea here. The empty, single-memory-stream, and
single-spill-file cases all choose a stream and then wrap it in
`ObservedStream` with the same metrics.
A helper like `fn observe_output(&self, stream: SendableRecordBatchStream)
-> SendableRecordBatchStream` would make the final-output metric invariant a
bit easier to spot and harder to accidentally miss later.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]