zhuqi-lucas commented on code in PR #14954: URL: https://github.com/apache/datafusion/pull/14954#discussion_r1980626645
########## datafusion-cli/src/print_options.rs: ########## @@ -127,47 +236,120 @@ impl PrintOptions { Ok(()) } - /// Print the stream to stdout using the specified format - pub async fn print_stream( + /// Print the stream to stdout using the format which is not table format + pub async fn print_no_table_streaming_batch<W: std::io::Write>( &self, - mut stream: Pin<Box<dyn RecordBatchStream>>, - query_start_time: Instant, + stream: &mut SendableRecordBatchStream, + writer: &mut W, + now: Instant, ) -> Result<()> { - if self.format == PrintFormat::Table { - return Err(DataFusionError::External( - "PrintFormat::Table is not implemented".to_string().into(), - )); + let max_count = match self.maxrows { + MaxRows::Unlimited => usize::MAX, + MaxRows::Limited(n) => n, }; - let stdout = std::io::stdout(); - let mut writer = stdout.lock(); - let mut row_count = 0_usize; let mut with_header = true; + let mut max_rows_reached = false; while let Some(maybe_batch) = stream.next().await { let batch = maybe_batch?; - row_count += batch.num_rows(); - self.format.print_batches( - &mut writer, - batch.schema(), - &[batch], - MaxRows::Unlimited, - with_header, - )?; + let curr_batch_rows = batch.num_rows(); + if !max_rows_reached && row_count < max_count { + if row_count + curr_batch_rows > max_count { + let needed = max_count - row_count; + let batch_to_print = batch.slice(0, needed); + self.format.print_no_table_batches( + writer, + batch.schema(), + &[batch_to_print], + with_header, + )?; + max_rows_reached = true; + } else { + self.format.print_no_table_batches( + writer, + batch.schema(), + &[batch], + with_header, + )?; + } + } + row_count += curr_batch_rows; with_header = false; } - let formatted_exec_details = get_execution_details_formatted( - row_count, - MaxRows::Unlimited, - query_start_time, - ); + let formatted_exec_details = self.get_execution_details_formatted(row_count, now); if !self.quiet { writeln!(writer, "{formatted_exec_details}")?; } Ok(()) } + + /// Print the stream to stdout using the specified format + /// There are two modes of operation: + /// 1. If the format is table, the stream is processed in batches and previewed to determine the column widths + /// before printing the full result set. And after we have the column widths, we print batch by batch with the correct widths. + /// + /// 2. If the format is not table, the stream is processed batch by batch and printed immediately. + /// + /// The query_start_time is used to calculate the elapsed time for the query. + /// The schema is used to print the header. + pub async fn print_stream( Review Comment: Unified to one API for all cases. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org