zhuqi-lucas commented on code in PR #17105: URL: https://github.com/apache/datafusion/pull/17105#discussion_r2265100256
########## datafusion/physical-plan/src/coalesce_batches.rs: ########## @@ -277,102 +275,41 @@ impl Stream for CoalesceBatchesStream { } } -/// Enumeration of possible states for `CoalesceBatchesStream`. -/// It represents different stages in the lifecycle of a stream of record batches. -/// -/// An example of state transition: -/// Notation: -/// `[3000]`: A batch with size 3000 -/// `{[2000], [3000]}`: `CoalesceBatchStream`'s internal buffer with 2 batches buffered -/// Input of `CoalesceBatchStream` will generate three batches `[2000], [3000], [4000]` -/// The coalescing procedure will go through the following steps with 4096 coalescing threshold: -/// 1. Read the first batch and get it buffered. -/// - initial state: `Pull` -/// - initial buffer: `{}` -/// - updated buffer: `{[2000]}` -/// - next state: `Pull` -/// 2. Read the second batch, the coalescing target is reached since 2000 + 3000 > 4096 -/// - initial state: `Pull` -/// - initial buffer: `{[2000]}` -/// - updated buffer: `{[2000], [3000]}` -/// - next state: `ReturnBuffer` -/// 4. Two batches in the batch get merged and consumed by the upstream operator. -/// - initial state: `ReturnBuffer` -/// - initial buffer: `{[2000], [3000]}` -/// - updated buffer: `{}` -/// - next state: `Pull` -/// 5. Read the third input batch. -/// - initial state: `Pull` -/// - initial buffer: `{}` -/// - updated buffer: `{[4000]}` -/// - next state: `Pull` -/// 5. The input is ended now. Jump to exhaustion state preparing the finalized data. -/// - initial state: `Pull` -/// - initial buffer: `{[4000]}` -/// - updated buffer: `{[4000]}` -/// - next state: `Exhausted` -#[derive(Debug, Clone, Eq, PartialEq)] -enum CoalesceBatchesStreamState { - /// State to pull a new batch from the input stream. - Pull, - /// State to return a buffered batch. - ReturnBuffer, - /// State indicating that the stream is exhausted. - Exhausted, -} - impl CoalesceBatchesStream { fn poll_next_inner( self: &mut Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<RecordBatch>>> { let cloned_time = self.baseline_metrics.elapsed_compute().clone(); loop { - match &self.inner_state { - CoalesceBatchesStreamState::Pull => { - // Attempt to pull the next batch from the input stream. - let input_batch = ready!(self.input.poll_next_unpin(cx)); - // Start timing the operation. The timer records time upon being dropped. - let _timer = cloned_time.timer(); - - match input_batch { - Some(Ok(batch)) => match self.coalescer.push_batch(batch) { - CoalescerState::Continue => {} - CoalescerState::LimitReached => { - self.inner_state = CoalesceBatchesStreamState::Exhausted; - } - CoalescerState::TargetReached => { - self.inner_state = - CoalesceBatchesStreamState::ReturnBuffer; - } - }, - None => { - // End of input stream, but buffered batches might still be present. - self.inner_state = CoalesceBatchesStreamState::Exhausted; - } - other => return Poll::Ready(other), - } - } - CoalesceBatchesStreamState::ReturnBuffer => { - let _timer = cloned_time.timer(); - // Combine buffered batches into one batch and return it. - let batch = self.coalescer.finish_batch()?; - // Set to pull state for the next iteration. - self.inner_state = CoalesceBatchesStreamState::Pull; - return Poll::Ready(Some(Ok(batch))); + // If there is any completed batch ready, return it + if let Some(batch) = self.coalescer.next_completed_batch() { + return Poll::Ready(Some(Ok(batch))); + } + if self.completed { + // If input is done and no batches are ready, return None to signal end of stream. + return Poll::Ready(None); + } + // Attempt to pull the next batch from the input stream. + let input_batch = ready!(self.input.poll_next_unpin(cx)); + // Start timing the operation. The timer records time upon being dropped. + let _timer = cloned_time.timer(); + + match input_batch { + None => { + // Input stream is exhausted, finalize any remaining batches + self.completed = true; + self.coalescer.finish()?; } - CoalesceBatchesStreamState::Exhausted => { - // Handle the end of the input stream. - return if self.coalescer.is_empty() { - // If buffer is empty, return None indicating the stream is fully consumed. - Poll::Ready(None) - } else { - let _timer = cloned_time.timer(); - // If the buffer still contains batches, prepare to return them. - let batch = self.coalescer.finish_batch()?; - Poll::Ready(Some(Ok(batch))) - }; + Some(Ok(batch)) => { + if self.coalescer.push_batch(batch)? { Review Comment: Addressed it in latest PR, it returns an enum now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org