Dandandan commented on code in PR #14766: URL: https://github.com/apache/datafusion/pull/14766#discussion_r1965294394
########## datafusion-cli/src/exec.rs: ########## @@ -247,10 +253,29 @@ pub(super) async fn exec_and_print( let stream = execute_stream(physical_plan, task_ctx.clone())?; print_options.print_stream(stream, now).await?; } else { - // Bounded stream; collected results are printed after all input consumed. + // Bounded stream; collected results size is limited by the maxrows option let schema = physical_plan.schema(); - let results = collect(physical_plan, task_ctx.clone()).await?; - adjusted.into_inner().print_batches(schema, &results, now)?; + let mut stream = execute_stream(physical_plan, task_ctx.clone())?; + let mut results = vec![]; + let mut row_count = 0_usize; + while let Some(batch) = stream.next().await { + let batch = batch?; + let curr_num_rows = batch.num_rows(); + if let MaxRows::Limited(max_rows) = print_options.maxrows { + // Stop collecting results if the number of rows exceeds the limit + // results batch should include the last batch that exceeds the limit + if row_count < max_rows + curr_num_rows { + // Try to grow the reservation to accommodate the batch in memory + reservation.try_grow(get_record_batch_memory_size(&batch))?; + results.push(batch); + } + } + row_count += curr_num_rows; + } + adjusted + .into_inner() + .print_batches(schema, &results, now, row_count)?; Review Comment: Would be nice if it even could print based on the stream instead of collecting. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org