alamb commented on code in PR #11610: URL: https://github.com/apache/datafusion/pull/11610#discussion_r1696642951
########## datafusion/physical-plan/src/coalesce_batches.rs: ########## @@ -249,84 +270,178 @@ impl CoalesceBatchesStream { let input_batch = self.input.poll_next_unpin(cx); // records time on drop let _timer = cloned_time.timer(); - match input_batch { - Poll::Ready(x) => match x { - Some(Ok(batch)) => { - let batch = gc_string_view_batch(&batch); - - // Handle fetch limit: - if let Some(fetch) = self.fetch { - if self.total_rows + batch.num_rows() >= fetch { - // We have reached the fetch limit. - let remaining_rows = fetch - self.total_rows; - debug_assert!(remaining_rows > 0); - + match ready!(input_batch) { + Some(result) => { + let Ok(input_batch) = result else { + return Poll::Ready(Some(result)); // pass back error + }; + // Buffer the batch and either get more input if not enough + // rows yet or output + match self.coalescer.push_batch(input_batch) { + Ok(None) => continue, + res => { + if self.coalescer.limit_reached() { self.is_closed = true; - self.total_rows = fetch; - // Trim the batch and add to buffered batches: - let batch = batch.slice(0, remaining_rows); - self.buffered_rows += batch.num_rows(); - self.buffer.push(batch); - // Combine buffered batches: - let batch = concat_batches(&self.schema, &self.buffer)?; - // Reset the buffer state and return final batch: - self.buffer.clear(); - self.buffered_rows = 0; - return Poll::Ready(Some(Ok(batch))); - } - } - self.total_rows += batch.num_rows(); - - if batch.num_rows() >= self.target_batch_size - && self.buffer.is_empty() - { - return Poll::Ready(Some(Ok(batch))); - } else if batch.num_rows() == 0 { - // discard empty batches - } else { - // add to the buffered batches - self.buffered_rows += batch.num_rows(); - self.buffer.push(batch); - // check to see if we have enough batches yet - if self.buffered_rows >= self.target_batch_size { - // combine the batches and return - let batch = concat_batches(&self.schema, &self.buffer)?; - // reset buffer state - self.buffer.clear(); - self.buffered_rows = 0; - // return batch - return Poll::Ready(Some(Ok(batch))); } + return Poll::Ready(res.transpose()); } } - None => { - self.is_closed = true; - // we have reached the end of the input stream but there could still - // be buffered batches - if self.buffer.is_empty() { - return Poll::Ready(None); - } else { - // combine the batches and return - let batch = concat_batches(&self.schema, &self.buffer)?; - // reset buffer state - self.buffer.clear(); - self.buffered_rows = 0; - // return batch - return Poll::Ready(Some(Ok(batch))); - } - } - other => return Poll::Ready(other), - }, - Poll::Pending => return Poll::Pending, + } + None => { + self.is_closed = true; + // we have reached the end of the input stream but there could still + // be buffered batches + return match self.coalescer.finish() { + Ok(None) => Poll::Ready(None), + res => Poll::Ready(res.transpose()), + }; + } } } } } impl RecordBatchStream for CoalesceBatchesStream { + fn schema(&self) -> SchemaRef { + self.coalescer.schema() + } +} + +/// Concatenate multiple record batches into larger batches +/// +/// See [`CoalesceBatchesExec`] for more details. +/// +/// Notes: +/// +/// 1. The output rows is the same order as the input rows +/// +/// 2. The output is a sequence of batches, with all but the last being at least +/// `target_batch_size` rows. +/// +/// 3. Eventually this may also be able to handle other optimizations such as a Review Comment: this is my long term ambition -- to apply filter + coalesce in a single operation (and thus save a copy) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org