ozankabak commented on code in PR #13794:
URL: https://github.com/apache/datafusion/pull/13794#discussion_r1888572337


##########
datafusion/physical-plan/src/unnest.rs:
##########
@@ -278,28 +279,31 @@ impl UnnestStream {
     fn poll_next_impl(
         &mut self,
         cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Option<Result<RecordBatch>>> {
-        self.input
-            .poll_next_unpin(cx)
-            .map(|maybe_batch| match maybe_batch {
+    ) -> Poll<Option<Result<RecordBatch>>> {
+        loop {
+            return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
                 Some(Ok(batch)) => {
                     let timer = self.metrics.elapsed_compute.timer();
+                    self.metrics.input_batches.add(1);
+                    self.metrics.input_rows.add(batch.num_rows());
                     let result = build_batch(
                         &batch,
                         &self.schema,
                         &self.list_type_columns,
                         &self.struct_column_indices,
                         &self.options,
-                    );
-                    self.metrics.input_batches.add(1);
-                    self.metrics.input_rows.add(batch.num_rows());
-                    if let Ok(ref batch) = result {
-                        timer.done();
-                        self.metrics.output_batches.add(1);
-                        self.metrics.output_rows.add(batch.num_rows());
-                    }
-
-                    Some(result)
+                    )?;
+                    timer.done();
+                    let Some(result_batch) = result else {
+                        continue;
+                    };
+                    self.metrics.output_batches.add(1);
+                    self.metrics.output_rows.add(result_batch.num_rows());
+
+                    // Empty record batches should not be emitted.
+                    // They need to be treated as  [`Option<RecordBatch>`]es 
and handle separately

Review Comment:
   ```suggestion
                       // They need to be treated as  [`Option<RecordBatch>`]es 
and handled separately
   ```



##########
datafusion/physical-plan/src/aggregates/row_hash.rs:
##########
@@ -768,6 +776,9 @@ impl Stream for GroupedHashAggregateStream {
                         let output = batch.slice(0, size);
                         (ExecutionState::ProducingOutput(remaining), output)
                     };
+                    // Empty record batches should not be emitted.
+                    // They need to be treated as  [`Option<RecordBatch>`]es 
and handle separately

Review Comment:
   ```suggestion
                       // They need to be treated as  [`Option<RecordBatch>`]es 
and handled separately
   ```



##########
datafusion/physical-plan/src/sorts/partial_sort.rs:
##########
@@ -407,6 +407,9 @@ impl PartialSortStream {
                 self.is_closed = true;
             }
         }
+        // Empty record batches should not be emitted.
+        // They need to be treated as [`Option<RecordBatch>`]es and handle 
separately

Review Comment:
   ```suggestion
           // They need to be treated as [`Option<RecordBatch>`]es and handled 
separately
   ```



##########
datafusion/physical-plan/src/unnest.rs:
##########
@@ -573,16 +578,16 @@ fn build_batch(
                     true => batch.columns(),
                     false => &flatten_arrs,
                 };
-                let (temp_result, num_rows) = list_unnest_at_level(
+                let temp_result = list_unnest_at_level(
                     input,
                     list_type_columns,
                     &mut temp_unnested_result,
                     depth,
                     options,
                 )?;
-                if num_rows == 0 {
-                    return Ok(RecordBatch::new_empty(Arc::clone(schema)));
-                }
+                let Some(temp_result) = temp_result else {
+                    return Ok(None);
+                };

Review Comment:
   We can use the more succinct
   ```rust
                   let Some(temp_result) = list_unnest_at_level(
                       input,
                       list_type_columns,
                       &mut temp_unnested_result,
                       depth,
                       options,
                   )? else {
                       continue;
                   };
   ```



##########
datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs:
##########
@@ -979,26 +979,33 @@ impl BoundedWindowAggStream {
             return Poll::Ready(None);
         }
 
-        let result = match ready!(self.input.poll_next_unpin(cx)) {
+        match ready!(self.input.poll_next_unpin(cx)) {
             Some(Ok(batch)) => {
                 self.search_mode.update_partition_batch(
                     &mut self.input_buffer,
                     batch,
                     &self.window_expr,
                     &mut self.partition_buffers,
                 )?;
-                self.compute_aggregates()
+                let batch = self.compute_aggregates()?;
+                if let Some(batch) = batch {
+                    return Poll::Ready(Some(Ok(batch)));
+                }

Review Comment:
   Can we say
   ```suggestion
                   if let Some(batch) = self.compute_aggregates()? {
                       return Poll::Ready(Some(Ok(batch)));
                   }
   ```



##########
datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs:
##########
@@ -979,26 +979,33 @@ impl BoundedWindowAggStream {
             return Poll::Ready(None);
         }
 
-        let result = match ready!(self.input.poll_next_unpin(cx)) {
+        match ready!(self.input.poll_next_unpin(cx)) {
             Some(Ok(batch)) => {
                 self.search_mode.update_partition_batch(
                     &mut self.input_buffer,
                     batch,
                     &self.window_expr,
                     &mut self.partition_buffers,
                 )?;
-                self.compute_aggregates()
+                let batch = self.compute_aggregates()?;
+                if let Some(batch) = batch {
+                    return Poll::Ready(Some(Ok(batch)));
+                }
+                self.poll_next_inner(cx)
             }
-            Some(Err(e)) => Err(e),
+            Some(Err(e)) => Poll::Ready(Some(Err(e))),
             None => {
                 self.finished = true;
                 for (_, partition_batch_state) in 
self.partition_buffers.iter_mut() {
                     partition_batch_state.is_end = true;
                 }
-                self.compute_aggregates()
+                let batch = self.compute_aggregates()?;
+                if let Some(batch) = batch {
+                    return Poll::Ready(Some(Ok(batch)));
+                }

Review Comment:
   Can we say
   ```suggestion
                   if let Some(batch) = self.compute_aggregates()? {
                       return Poll::Ready(Some(Ok(batch)));
                   }
   ```



##########
datafusion/physical-plan/src/windows/window_agg_exec.rs:
##########
@@ -380,18 +381,23 @@ impl WindowAggStream {
         }
 
         loop {
-            let result = match ready!(self.input.poll_next_unpin(cx)) {
+            let maybe_batch = match ready!(self.input.poll_next_unpin(cx)) {
                 Some(Ok(batch)) => {
                     self.batches.push(batch);
                     continue;
                 }
                 Some(Err(e)) => Err(e),
                 None => self.compute_aggregates(),
+            }?;
+            let Some(result) = maybe_batch else {
+                return Poll::Ready(None);
             };

Review Comment:
   Can this go inside the `None` arm of the match above?



##########
datafusion/physical-plan/src/windows/window_agg_exec.rs:
##########
@@ -380,18 +381,23 @@ impl WindowAggStream {
         }
 
         loop {
-            let result = match ready!(self.input.poll_next_unpin(cx)) {
+            let maybe_batch = match ready!(self.input.poll_next_unpin(cx)) {
                 Some(Ok(batch)) => {
                     self.batches.push(batch);
                     continue;
                 }
                 Some(Err(e)) => Err(e),
                 None => self.compute_aggregates(),
+            }?;
+            let Some(result) = maybe_batch else {
+                return Poll::Ready(None);
             };
 
             self.finished = true;
-
-            return Poll::Ready(Some(result));
+            // Empty record batches should not be emitted.
+            // They need to be treated as  [`Option<RecordBatch>`]es and 
handle separately

Review Comment:
   ```suggestion
               // They need to be treated as  [`Option<RecordBatch>`]es and 
handled separately
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to