alamb commented on code in PR #11610:
URL: https://github.com/apache/datafusion/pull/11610#discussion_r1696642951


##########
datafusion/physical-plan/src/coalesce_batches.rs:
##########
@@ -249,84 +270,178 @@ impl CoalesceBatchesStream {
             let input_batch = self.input.poll_next_unpin(cx);
             // records time on drop
             let _timer = cloned_time.timer();
-            match input_batch {
-                Poll::Ready(x) => match x {
-                    Some(Ok(batch)) => {
-                        let batch = gc_string_view_batch(&batch);
-
-                        // Handle fetch limit:
-                        if let Some(fetch) = self.fetch {
-                            if self.total_rows + batch.num_rows() >= fetch {
-                                // We have reached the fetch limit.
-                                let remaining_rows = fetch - self.total_rows;
-                                debug_assert!(remaining_rows > 0);
-
+            match ready!(input_batch) {
+                Some(result) => {
+                    let Ok(input_batch) = result else {
+                        return Poll::Ready(Some(result)); // pass back error
+                    };
+                    // Buffer the batch and either get more input if not enough
+                    // rows yet or output
+                    match self.coalescer.push_batch(input_batch) {
+                        Ok(None) => continue,
+                        res => {
+                            if self.coalescer.limit_reached() {
                                 self.is_closed = true;
-                                self.total_rows = fetch;
-                                // Trim the batch and add to buffered batches:
-                                let batch = batch.slice(0, remaining_rows);
-                                self.buffered_rows += batch.num_rows();
-                                self.buffer.push(batch);
-                                // Combine buffered batches:
-                                let batch = concat_batches(&self.schema, 
&self.buffer)?;
-                                // Reset the buffer state and return final 
batch:
-                                self.buffer.clear();
-                                self.buffered_rows = 0;
-                                return Poll::Ready(Some(Ok(batch)));
-                            }
-                        }
-                        self.total_rows += batch.num_rows();
-
-                        if batch.num_rows() >= self.target_batch_size
-                            && self.buffer.is_empty()
-                        {
-                            return Poll::Ready(Some(Ok(batch)));
-                        } else if batch.num_rows() == 0 {
-                            // discard empty batches
-                        } else {
-                            // add to the buffered batches
-                            self.buffered_rows += batch.num_rows();
-                            self.buffer.push(batch);
-                            // check to see if we have enough batches yet
-                            if self.buffered_rows >= self.target_batch_size {
-                                // combine the batches and return
-                                let batch = concat_batches(&self.schema, 
&self.buffer)?;
-                                // reset buffer state
-                                self.buffer.clear();
-                                self.buffered_rows = 0;
-                                // return batch
-                                return Poll::Ready(Some(Ok(batch)));
                             }
+                            return Poll::Ready(res.transpose());
                         }
                     }
-                    None => {
-                        self.is_closed = true;
-                        // we have reached the end of the input stream but 
there could still
-                        // be buffered batches
-                        if self.buffer.is_empty() {
-                            return Poll::Ready(None);
-                        } else {
-                            // combine the batches and return
-                            let batch = concat_batches(&self.schema, 
&self.buffer)?;
-                            // reset buffer state
-                            self.buffer.clear();
-                            self.buffered_rows = 0;
-                            // return batch
-                            return Poll::Ready(Some(Ok(batch)));
-                        }
-                    }
-                    other => return Poll::Ready(other),
-                },
-                Poll::Pending => return Poll::Pending,
+                }
+                None => {
+                    self.is_closed = true;
+                    // we have reached the end of the input stream but there 
could still
+                    // be buffered batches
+                    return match self.coalescer.finish() {
+                        Ok(None) => Poll::Ready(None),
+                        res => Poll::Ready(res.transpose()),
+                    };
+                }
             }
         }
     }
 }
 
 impl RecordBatchStream for CoalesceBatchesStream {
+    fn schema(&self) -> SchemaRef {
+        self.coalescer.schema()
+    }
+}
+
+/// Concatenate multiple record batches into larger batches
+///
+/// See [`CoalesceBatchesExec`] for more details.
+///
+/// Notes:
+///
+/// 1. The output rows is the same order as the input rows
+///
+/// 2. The output is a sequence of batches, with all but the last being at 
least
+///    `target_batch_size` rows.
+///
+/// 3. Eventually this may also be able to handle other optimizations such as a

Review Comment:
   this is my long term ambition -- to apply filter + coalesce in a single 
operation (and thus save a copy)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to