This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fba5cc0b90 Streaming CLI support (#8651)
fba5cc0b90 is described below

commit fba5cc0b9062297e38cbe388d7f1b13debe8ba92
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu Dec 28 15:27:21 2023 +0300

    Streaming CLI support (#8651)
    
    * Streaming CLI support
    
    * Update Cargo.toml
    
    * Remove duplications
    
    * Clean up
    
    * Stream test will be added
    
    * Update print_format.rs
    
    * Address feedback
    
    * Final fix
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 Cargo.toml                                         |   2 +-
 datafusion-cli/Cargo.lock                          |   1 +
 datafusion-cli/Cargo.toml                          |   1 +
 datafusion-cli/src/exec.rs                         |  66 ++---
 datafusion-cli/src/main.rs                         |  19 +-
 datafusion-cli/src/print_format.rs                 | 278 +++++++++++++--------
 datafusion-cli/src/print_options.rs                |  74 +++++-
 .../core/src/datasource/physical_plan/mod.rs       |  15 ++
 8 files changed, 295 insertions(+), 161 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index a698fbf471..4ee29ea629 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -36,7 +36,7 @@ arrow = { version = "49.0.0", features = ["prettyprint"] }
 arrow-array = { version = "49.0.0", default-features = false, features = 
["chrono-tz"] }
 arrow-buffer = { version = "49.0.0", default-features = false }
 arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] }
-arrow-ipc = { version = "49.0.0", default-features = false, features=["lz4"] }
+arrow-ipc = { version = "49.0.0", default-features = false, features = ["lz4"] 
}
 arrow-ord = { version = "49.0.0", default-features = false }
 arrow-schema = { version = "49.0.0", default-features = false }
 async-trait = "0.1.73"
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 9f75013c86..8e9bbd8a0d 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1160,6 +1160,7 @@ dependencies = [
  "datafusion-common",
  "dirs",
  "env_logger",
+ "futures",
  "mimalloc",
  "object_store",
  "parking_lot",
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index f570976836..e1ddba4cad 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -38,6 +38,7 @@ datafusion = { path = "../datafusion/core", version = 
"34.0.0", features = ["avr
 datafusion-common = { path = "../datafusion/common" }
 dirs = "4.0.0"
 env_logger = "0.9"
+futures = "0.3"
 mimalloc = { version = "0.1", default-features = false }
 object_store = { version = "0.8.0", features = ["aws", "gcp"] }
 parking_lot = { version = "0.12" }
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index 8af534cd13..ba9aa2e69a 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -17,6 +17,12 @@
 
 //! Execution functions
 
+use std::io::prelude::*;
+use std::io::BufReader;
+use std::time::Instant;
+use std::{fs::File, sync::Arc};
+
+use crate::print_format::PrintFormat;
 use crate::{
     command::{Command, OutputFormat},
     helper::{unescape_input, CliHelper},
@@ -26,21 +32,19 @@ use crate::{
     },
     print_options::{MaxRows, PrintOptions},
 };
-use datafusion::common::plan_datafusion_err;
+
+use datafusion::common::{exec_datafusion_err, plan_datafusion_err};
+use datafusion::datasource::listing::ListingTableUrl;
+use datafusion::datasource::physical_plan::is_plan_streaming;
+use datafusion::error::{DataFusionError, Result};
+use datafusion::logical_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
+use datafusion::physical_plan::{collect, execute_stream};
+use datafusion::prelude::SessionContext;
 use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};
-use datafusion::{
-    datasource::listing::ListingTableUrl,
-    error::{DataFusionError, Result},
-    logical_expr::{CreateExternalTable, DdlStatement},
-};
-use datafusion::{logical_expr::LogicalPlan, prelude::SessionContext};
+
 use object_store::ObjectStore;
 use rustyline::error::ReadlineError;
 use rustyline::Editor;
-use std::io::prelude::*;
-use std::io::BufReader;
-use std::time::Instant;
-use std::{fs::File, sync::Arc};
 use url::Url;
 
 /// run and execute SQL statements and commands, against a context with the 
given print options
@@ -125,8 +129,6 @@ pub async fn exec_from_repl(
     )));
     rl.load_history(".history").ok();
 
-    let mut print_options = print_options.clone();
-
     loop {
         match rl.readline("❯ ") {
             Ok(line) if line.starts_with('\\') => {
@@ -138,9 +140,7 @@ pub async fn exec_from_repl(
                         Command::OutputFormat(subcommand) => {
                             if let Some(subcommand) = subcommand {
                                 if let Ok(command) = 
subcommand.parse::<OutputFormat>() {
-                                    if let Err(e) =
-                                        command.execute(&mut 
print_options).await
-                                    {
+                                    if let Err(e) = 
command.execute(print_options).await {
                                         eprintln!("{e}")
                                     }
                                 } else {
@@ -154,7 +154,7 @@ pub async fn exec_from_repl(
                             }
                         }
                         _ => {
-                            if let Err(e) = cmd.execute(ctx, &mut 
print_options).await {
+                            if let Err(e) = cmd.execute(ctx, 
print_options).await {
                                 eprintln!("{e}")
                             }
                         }
@@ -165,7 +165,7 @@ pub async fn exec_from_repl(
             }
             Ok(line) => {
                 rl.add_history_entry(line.trim_end())?;
-                match exec_and_print(ctx, &print_options, line).await {
+                match exec_and_print(ctx, print_options, line).await {
                     Ok(_) => {}
                     Err(err) => eprintln!("{err}"),
                 }
@@ -198,7 +198,6 @@ async fn exec_and_print(
     sql: String,
 ) -> Result<()> {
     let now = Instant::now();
-
     let sql = unescape_input(&sql)?;
     let task_ctx = ctx.task_ctx();
     let dialect = &task_ctx.session_config().options().sql_parser.dialect;
@@ -227,18 +226,24 @@ async fn exec_and_print(
         if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut 
plan {
             create_external_table(ctx, cmd).await?;
         }
+
         let df = ctx.execute_logical_plan(plan).await?;
-        let results = df.collect().await?;
+        let physical_plan = df.create_physical_plan().await?;
 
-        let print_options = if should_ignore_maxrows {
-            PrintOptions {
-                maxrows: MaxRows::Unlimited,
-                ..print_options.clone()
-            }
+        if is_plan_streaming(&physical_plan)? {
+            let stream = execute_stream(physical_plan, task_ctx.clone())?;
+            print_options.print_stream(stream, now).await?;
         } else {
-            print_options.clone()
-        };
-        print_options.print_batches(&results, now)?;
+            let mut print_options = print_options.clone();
+            if should_ignore_maxrows {
+                print_options.maxrows = MaxRows::Unlimited;
+            }
+            if print_options.format == PrintFormat::Automatic {
+                print_options.format = PrintFormat::Table;
+            }
+            let results = collect(physical_plan, task_ctx.clone()).await?;
+            print_options.print_batches(&results, now)?;
+        }
     }
 
     Ok(())
@@ -272,10 +277,7 @@ async fn create_external_table(
                 .object_store_registry
                 .get_store(url)
                 .map_err(|_| {
-                    DataFusionError::Execution(format!(
-                        "Unsupported object store scheme: {}",
-                        scheme
-                    ))
+                    exec_datafusion_err!("Unsupported object store scheme: 
{}", scheme)
                 })?
         }
     };
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 8b74a797b5..563d172f2c 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -15,7 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use clap::Parser;
+use std::collections::HashMap;
+use std::env;
+use std::path::Path;
+use std::str::FromStr;
+use std::sync::{Arc, OnceLock};
+
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::SessionConfig;
 use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
@@ -29,12 +34,9 @@ use datafusion_cli::{
     print_options::{MaxRows, PrintOptions},
     DATAFUSION_CLI_VERSION,
 };
+
+use clap::Parser;
 use mimalloc::MiMalloc;
-use std::collections::HashMap;
-use std::env;
-use std::path::Path;
-use std::str::FromStr;
-use std::sync::{Arc, OnceLock};
 
 #[global_allocator]
 static GLOBAL: MiMalloc = MiMalloc;
@@ -111,7 +113,7 @@ struct Args {
     )]
     rc: Option<Vec<String>>,
 
-    #[clap(long, arg_enum, default_value_t = PrintFormat::Table)]
+    #[clap(long, arg_enum, default_value_t = PrintFormat::Automatic)]
     format: PrintFormat,
 
     #[clap(
@@ -331,9 +333,8 @@ fn extract_memory_pool_size(size: &str) -> Result<usize, 
String> {
 
 #[cfg(test)]
 mod tests {
-    use datafusion::assert_batches_eq;
-
     use super::*;
+    use datafusion::assert_batches_eq;
 
     fn assert_conversion(input: &str, expected: Result<usize, String>) {
         let result = extract_memory_pool_size(input);
diff --git a/datafusion-cli/src/print_format.rs 
b/datafusion-cli/src/print_format.rs
index 0738bf6f9b..ea41856249 100644
--- a/datafusion-cli/src/print_format.rs
+++ b/datafusion-cli/src/print_format.rs
@@ -16,23 +16,27 @@
 // under the License.
 
 //! Print format variants
+
+use std::str::FromStr;
+
 use crate::print_options::MaxRows;
+
 use arrow::csv::writer::WriterBuilder;
 use arrow::json::{ArrayWriter, LineDelimitedWriter};
+use arrow::record_batch::RecordBatch;
 use arrow::util::pretty::pretty_format_batches_with_options;
-use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::common::format::DEFAULT_FORMAT_OPTIONS;
-use datafusion::error::{DataFusionError, Result};
-use std::str::FromStr;
+use datafusion::error::Result;
 
 /// Allow records to be printed in different formats
-#[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone)]
+#[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone, Copy)]
 pub enum PrintFormat {
     Csv,
     Tsv,
     Table,
     Json,
     NdJson,
+    Automatic,
 }
 
 impl FromStr for PrintFormat {
@@ -44,31 +48,44 @@ impl FromStr for PrintFormat {
 }
 
 macro_rules! batches_to_json {
-    ($WRITER: ident, $batches: expr) => {{
-        let mut bytes = vec![];
+    ($WRITER: ident, $writer: expr, $batches: expr) => {{
         {
-            let mut writer = $WRITER::new(&mut bytes);
-            $batches.iter().try_for_each(|batch| writer.write(batch))?;
-            writer.finish()?;
+            if !$batches.is_empty() {
+                let mut json_writer = $WRITER::new(&mut *$writer);
+                for batch in $batches {
+                    json_writer.write(batch)?;
+                }
+                json_writer.finish()?;
+                json_finish!($WRITER, $writer);
+            }
         }
-        String::from_utf8(bytes).map_err(|e| 
DataFusionError::External(Box::new(e)))?
+        Ok(()) as Result<()>
     }};
 }
 
-fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> 
Result<String> {
-    let mut bytes = vec![];
-    {
-        let builder = WriterBuilder::new()
-            .with_header(true)
-            .with_delimiter(delimiter);
-        let mut writer = builder.build(&mut bytes);
-        for batch in batches {
-            writer.write(batch)?;
-        }
+macro_rules! json_finish {
+    (ArrayWriter, $writer: expr) => {{
+        writeln!($writer)?;
+    }};
+    (LineDelimitedWriter, $writer: expr) => {{}};
+}
+
+fn print_batches_with_sep<W: std::io::Write>(
+    writer: &mut W,
+    batches: &[RecordBatch],
+    delimiter: u8,
+    with_header: bool,
+) -> Result<()> {
+    let builder = WriterBuilder::new()
+        .with_header(with_header)
+        .with_delimiter(delimiter);
+    let mut csv_writer = builder.build(writer);
+
+    for batch in batches {
+        csv_writer.write(batch)?;
     }
-    let formatted =
-        String::from_utf8(bytes).map_err(|e| 
DataFusionError::External(Box::new(e)))?;
-    Ok(formatted)
+
+    Ok(())
 }
 
 fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
@@ -88,97 +105,118 @@ fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
     result.join("\n")
 }
 
-fn format_batches_with_maxrows(
+fn format_batches_with_maxrows<W: std::io::Write>(
+    writer: &mut W,
     batches: &[RecordBatch],
     maxrows: MaxRows,
-) -> Result<String> {
+) -> Result<()> {
     match maxrows {
         MaxRows::Limited(maxrows) => {
-            // Only format enough batches for maxrows
+            // Filter batches to meet the maxrows condition
             let mut filtered_batches = Vec::new();
-            let mut batches = batches;
-            let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
-            if row_count > maxrows {
-                let mut accumulated_rows = 0;
-
-                for batch in batches {
+            let mut row_count: usize = 0;
+            let mut over_limit = false;
+            for batch in batches {
+                if row_count + batch.num_rows() > maxrows {
+                    // If adding this batch exceeds maxrows, slice the batch
+                    let limit = maxrows - row_count;
+                    let sliced_batch = batch.slice(0, limit);
+                    filtered_batches.push(sliced_batch);
+                    over_limit = true;
+                    break;
+                } else {
                     filtered_batches.push(batch.clone());
-                    if accumulated_rows + batch.num_rows() > maxrows {
-                        break;
-                    }
-                    accumulated_rows += batch.num_rows();
+                    row_count += batch.num_rows();
                 }
-
-                batches = &filtered_batches;
             }
 
-            let mut formatted = format!(
-                "{}",
-                pretty_format_batches_with_options(batches, 
&DEFAULT_FORMAT_OPTIONS)?,
-            );
-
-            if row_count > maxrows {
-                formatted = keep_only_maxrows(&formatted, maxrows);
+            let formatted = pretty_format_batches_with_options(
+                &filtered_batches,
+                &DEFAULT_FORMAT_OPTIONS,
+            )?;
+            if over_limit {
+                let mut formatted_str = format!("{}", formatted);
+                formatted_str = keep_only_maxrows(&formatted_str, maxrows);
+                writeln!(writer, "{}", formatted_str)?;
+            } else {
+                writeln!(writer, "{}", formatted)?;
             }
-
-            Ok(formatted)
         }
         MaxRows::Unlimited => {
-            // maxrows not specified, print all rows
-            Ok(format!(
-                "{}",
-                pretty_format_batches_with_options(batches, 
&DEFAULT_FORMAT_OPTIONS)?,
-            ))
+            let formatted =
+                pretty_format_batches_with_options(batches, 
&DEFAULT_FORMAT_OPTIONS)?;
+            writeln!(writer, "{}", formatted)?;
         }
     }
+
+    Ok(())
 }
 
 impl PrintFormat {
-    /// print the batches to stdout using the specified format
-    /// `maxrows` option is only used for `Table` format:
-    ///     If `maxrows` is Some(n), then at most n rows will be displayed
-    ///     If `maxrows` is None, then every row will be displayed
-    pub fn print_batches(&self, batches: &[RecordBatch], maxrows: MaxRows) -> 
Result<()> {
-        if batches.is_empty() {
+    /// Print the batches to a writer using the specified format
+    pub fn print_batches<W: std::io::Write>(
+        &self,
+        writer: &mut W,
+        batches: &[RecordBatch],
+        maxrows: MaxRows,
+        with_header: bool,
+    ) -> Result<()> {
+        if batches.is_empty() || batches[0].num_rows() == 0 {
             return Ok(());
         }
 
         match self {
-            Self::Csv => println!("{}", print_batches_with_sep(batches, 
b',')?),
-            Self::Tsv => println!("{}", print_batches_with_sep(batches, 
b'\t')?),
+            Self::Csv | Self::Automatic => {
+                print_batches_with_sep(writer, batches, b',', with_header)
+            }
+            Self::Tsv => print_batches_with_sep(writer, batches, b'\t', 
with_header),
             Self::Table => {
                 if maxrows == MaxRows::Limited(0) {
                     return Ok(());
                 }
-                println!("{}", format_batches_with_maxrows(batches, maxrows)?,)
-            }
-            Self::Json => println!("{}", batches_to_json!(ArrayWriter, 
batches)),
-            Self::NdJson => {
-                println!("{}", batches_to_json!(LineDelimitedWriter, batches))
+                format_batches_with_maxrows(writer, batches, maxrows)
             }
+            Self::Json => batches_to_json!(ArrayWriter, writer, batches),
+            Self::NdJson => batches_to_json!(LineDelimitedWriter, writer, 
batches),
         }
-        Ok(())
     }
 }
 
 #[cfg(test)]
 mod tests {
+    use std::io::{Cursor, Read, Write};
+    use std::sync::Arc;
+
     use super::*;
+
     use arrow::array::Int32Array;
     use arrow::datatypes::{DataType, Field, Schema};
-    use std::sync::Arc;
+    use datafusion::error::Result;
+
+    fn run_test<F>(batches: &[RecordBatch], test_fn: F) -> Result<String>
+    where
+        F: Fn(&mut Cursor<Vec<u8>>, &[RecordBatch]) -> Result<()>,
+    {
+        let mut buffer = Cursor::new(Vec::new());
+        test_fn(&mut buffer, batches)?;
+        buffer.set_position(0);
+        let mut contents = String::new();
+        buffer.read_to_string(&mut contents)?;
+        Ok(contents)
+    }
 
     #[test]
-    fn test_print_batches_with_sep() {
-        let batches = vec![];
-        assert_eq!("", print_batches_with_sep(&batches, b',').unwrap());
+    fn test_print_batches_with_sep() -> Result<()> {
+        let contents = run_test(&[], |buffer, batches| {
+            print_batches_with_sep(buffer, batches, b',', true)
+        })?;
+        assert_eq!(contents, "");
 
         let schema = Arc::new(Schema::new(vec![
             Field::new("a", DataType::Int32, false),
             Field::new("b", DataType::Int32, false),
             Field::new("c", DataType::Int32, false),
         ]));
-
         let batch = RecordBatch::try_new(
             schema,
             vec![
@@ -186,29 +224,33 @@ mod tests {
                 Arc::new(Int32Array::from(vec![4, 5, 6])),
                 Arc::new(Int32Array::from(vec![7, 8, 9])),
             ],
-        )
-        .unwrap();
+        )?;
 
-        let batches = vec![batch];
-        let r = print_batches_with_sep(&batches, b',').unwrap();
-        assert_eq!("a,b,c\n1,4,7\n2,5,8\n3,6,9\n", r);
+        let contents = run_test(&[batch], |buffer, batches| {
+            print_batches_with_sep(buffer, batches, b',', true)
+        })?;
+        assert_eq!(contents, "a,b,c\n1,4,7\n2,5,8\n3,6,9\n");
+
+        Ok(())
     }
 
     #[test]
     fn test_print_batches_to_json_empty() -> Result<()> {
-        let batches = vec![];
-        let r = batches_to_json!(ArrayWriter, &batches);
-        assert_eq!("", r);
+        let contents = run_test(&[], |buffer, batches| {
+            batches_to_json!(ArrayWriter, buffer, batches)
+        })?;
+        assert_eq!(contents, "");
 
-        let r = batches_to_json!(LineDelimitedWriter, &batches);
-        assert_eq!("", r);
+        let contents = run_test(&[], |buffer, batches| {
+            batches_to_json!(LineDelimitedWriter, buffer, batches)
+        })?;
+        assert_eq!(contents, "");
 
         let schema = Arc::new(Schema::new(vec![
             Field::new("a", DataType::Int32, false),
             Field::new("b", DataType::Int32, false),
             Field::new("c", DataType::Int32, false),
         ]));
-
         let batch = RecordBatch::try_new(
             schema,
             vec![
@@ -216,25 +258,29 @@ mod tests {
                 Arc::new(Int32Array::from(vec![4, 5, 6])),
                 Arc::new(Int32Array::from(vec![7, 8, 9])),
             ],
-        )
-        .unwrap();
-
+        )?;
         let batches = vec![batch];
-        let r = batches_to_json!(ArrayWriter, &batches);
-        
assert_eq!("[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]",
 r);
 
-        let r = batches_to_json!(LineDelimitedWriter, &batches);
-        
assert_eq!("{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n",
 r);
+        let contents = run_test(&batches, |buffer, batches| {
+            batches_to_json!(ArrayWriter, buffer, batches)
+        })?;
+        assert_eq!(contents, 
"[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]\n");
+
+        let contents = run_test(&batches, |buffer, batches| {
+            batches_to_json!(LineDelimitedWriter, buffer, batches)
+        })?;
+        assert_eq!(contents, 
"{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n");
+
         Ok(())
     }
 
     #[test]
     fn test_format_batches_with_maxrows() -> Result<()> {
         let schema = Arc::new(Schema::new(vec![Field::new("a", 
DataType::Int32, false)]));
-
-        let batch =
-            RecordBatch::try_new(schema, 
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))])
-                .unwrap();
+        let batch = RecordBatch::try_new(
+            schema,
+            vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
+        )?;
 
         #[rustfmt::skip]
         let all_rows_expected = [
@@ -244,7 +290,7 @@ mod tests {
             "| 1 |",
             "| 2 |",
             "| 3 |",
-            "+---+",
+            "+---+\n",
         ].join("\n");
 
         #[rustfmt::skip]
@@ -256,7 +302,7 @@ mod tests {
             "| . |",
             "| . |",
             "| . |",
-            "+---+",
+            "+---+\n",
         ].join("\n");
 
         #[rustfmt::skip]
@@ -272,26 +318,36 @@ mod tests {
             "| . |",
             "| . |",
             "| . |",
-            "+---+",
+            "+---+\n",
         ].join("\n");
 
-        let no_limit = format_batches_with_maxrows(&[batch.clone()], 
MaxRows::Unlimited)?;
-        assert_eq!(all_rows_expected, no_limit);
-
-        let maxrows_less_than_actual =
-            format_batches_with_maxrows(&[batch.clone()], 
MaxRows::Limited(1))?;
-        assert_eq!(one_row_expected, maxrows_less_than_actual);
-        let maxrows_more_than_actual =
-            format_batches_with_maxrows(&[batch.clone()], 
MaxRows::Limited(5))?;
-        assert_eq!(all_rows_expected, maxrows_more_than_actual);
-        let maxrows_equals_actual =
-            format_batches_with_maxrows(&[batch.clone()], 
MaxRows::Limited(3))?;
-        assert_eq!(all_rows_expected, maxrows_equals_actual);
-        let multi_batches = format_batches_with_maxrows(
+        let no_limit = run_test(&[batch.clone()], |buffer, batches| {
+            format_batches_with_maxrows(buffer, batches, MaxRows::Unlimited)
+        })?;
+        assert_eq!(no_limit, all_rows_expected);
+
+        let maxrows_less_than_actual = run_test(&[batch.clone()], |buffer, 
batches| {
+            format_batches_with_maxrows(buffer, batches, MaxRows::Limited(1))
+        })?;
+        assert_eq!(maxrows_less_than_actual, one_row_expected);
+
+        let maxrows_more_than_actual = run_test(&[batch.clone()], |buffer, 
batches| {
+            format_batches_with_maxrows(buffer, batches, MaxRows::Limited(5))
+        })?;
+        assert_eq!(maxrows_more_than_actual, all_rows_expected);
+
+        let maxrows_equals_actual = run_test(&[batch.clone()], |buffer, 
batches| {
+            format_batches_with_maxrows(buffer, batches, MaxRows::Limited(3))
+        })?;
+        assert_eq!(maxrows_equals_actual, all_rows_expected);
+
+        let multi_batches = run_test(
             &[batch.clone(), batch.clone(), batch.clone()],
-            MaxRows::Limited(5),
+            |buffer, batches| {
+                format_batches_with_maxrows(buffer, batches, 
MaxRows::Limited(5))
+            },
         )?;
-        assert_eq!(multi_batches_expected, multi_batches);
+        assert_eq!(multi_batches, multi_batches_expected);
 
         Ok(())
     }
diff --git a/datafusion-cli/src/print_options.rs 
b/datafusion-cli/src/print_options.rs
index 0a6c8d4c36..b8594352b5 100644
--- a/datafusion-cli/src/print_options.rs
+++ b/datafusion-cli/src/print_options.rs
@@ -15,13 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::print_format::PrintFormat;
-use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::error::Result;
 use std::fmt::{Display, Formatter};
+use std::io::Write;
+use std::pin::Pin;
 use std::str::FromStr;
 use std::time::Instant;
 
+use crate::print_format::PrintFormat;
+
+use arrow::record_batch::RecordBatch;
+use datafusion::common::DataFusionError;
+use datafusion::error::Result;
+use datafusion::physical_plan::RecordBatchStream;
+
+use futures::StreamExt;
+
 #[derive(Debug, Clone, PartialEq, Copy)]
 pub enum MaxRows {
     /// show all rows in the output
@@ -85,20 +93,70 @@ fn get_timing_info_str(
 }
 
 impl PrintOptions {
-    /// print the batches to stdout using the specified format
+    /// Print the batches to stdout using the specified format
     pub fn print_batches(
         &self,
         batches: &[RecordBatch],
         query_start_time: Instant,
     ) -> Result<()> {
+        let stdout = std::io::stdout();
+        let mut writer = stdout.lock();
+
+        self.format
+            .print_batches(&mut writer, batches, self.maxrows, true)?;
+
         let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
-        // Elapsed time should not count time for printing batches
-        let timing_info = get_timing_info_str(row_count, self.maxrows, 
query_start_time);
+        let timing_info = get_timing_info_str(
+            row_count,
+            if self.format == PrintFormat::Table {
+                self.maxrows
+            } else {
+                MaxRows::Unlimited
+            },
+            query_start_time,
+        );
+
+        if !self.quiet {
+            writeln!(writer, "{timing_info}")?;
+        }
+
+        Ok(())
+    }
+
+    /// Print the stream to stdout using the specified format
+    pub async fn print_stream(
+        &self,
+        mut stream: Pin<Box<dyn RecordBatchStream>>,
+        query_start_time: Instant,
+    ) -> Result<()> {
+        if self.format == PrintFormat::Table {
+            return Err(DataFusionError::External(
+                "PrintFormat::Table is not implemented".to_string().into(),
+            ));
+        };
+
+        let stdout = std::io::stdout();
+        let mut writer = stdout.lock();
+
+        let mut row_count = 0_usize;
+        let mut with_header = true;
+
+        while let Some(Ok(batch)) = stream.next().await {
+            row_count += batch.num_rows();
+            self.format.print_batches(
+                &mut writer,
+                &[batch],
+                MaxRows::Unlimited,
+                with_header,
+            )?;
+            with_header = false;
+        }
 
-        self.format.print_batches(batches, self.maxrows)?;
+        let timing_info =
+            get_timing_info_str(row_count, MaxRows::Unlimited, 
query_start_time);
 
         if !self.quiet {
-            println!("{timing_info}");
+            writeln!(writer, "{timing_info}")?;
         }
 
         Ok(())
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs 
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 4a6ebeab09..5583991355 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -69,6 +69,7 @@ use arrow::{
 use datafusion_common::{file_options::FileTypeWriterOptions, plan_err};
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_plan::ExecutionPlan;
 
 use log::debug;
 use object_store::path::Path;
@@ -507,6 +508,20 @@ fn get_projected_output_ordering(
     all_orderings
 }
 
+/// Get output (un)boundedness information for the given `plan`.
+pub fn is_plan_streaming(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
+    if plan.children().is_empty() {
+        plan.unbounded_output(&[])
+    } else {
+        let children_unbounded_output = plan
+            .children()
+            .iter()
+            .map(is_plan_streaming)
+            .collect::<Result<Vec<_>>>();
+        plan.unbounded_output(&children_unbounded_output?)
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use arrow_array::cast::AsArray;

Reply via email to