This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new fba5cc0b90 Streaming CLI support (#8651)
fba5cc0b90 is described below
commit fba5cc0b9062297e38cbe388d7f1b13debe8ba92
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu Dec 28 15:27:21 2023 +0300
Streaming CLI support (#8651)
* Streaming CLI support
* Update Cargo.toml
* Remove duplications
* Clean up
* Stream test will be added
* Update print_format.rs
* Address feedback
* Final fix
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
Cargo.toml | 2 +-
datafusion-cli/Cargo.lock | 1 +
datafusion-cli/Cargo.toml | 1 +
datafusion-cli/src/exec.rs | 66 ++---
datafusion-cli/src/main.rs | 19 +-
datafusion-cli/src/print_format.rs | 278 +++++++++++++--------
datafusion-cli/src/print_options.rs | 74 +++++-
.../core/src/datasource/physical_plan/mod.rs | 15 ++
8 files changed, 295 insertions(+), 161 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index a698fbf471..4ee29ea629 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -36,7 +36,7 @@ arrow = { version = "49.0.0", features = ["prettyprint"] }
arrow-array = { version = "49.0.0", default-features = false, features =
["chrono-tz"] }
arrow-buffer = { version = "49.0.0", default-features = false }
arrow-flight = { version = "49.0.0", features = ["flight-sql-experimental"] }
-arrow-ipc = { version = "49.0.0", default-features = false, features=["lz4"] }
+arrow-ipc = { version = "49.0.0", default-features = false, features = ["lz4"]
}
arrow-ord = { version = "49.0.0", default-features = false }
arrow-schema = { version = "49.0.0", default-features = false }
async-trait = "0.1.73"
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 9f75013c86..8e9bbd8a0d 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1160,6 +1160,7 @@ dependencies = [
"datafusion-common",
"dirs",
"env_logger",
+ "futures",
"mimalloc",
"object_store",
"parking_lot",
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index f570976836..e1ddba4cad 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -38,6 +38,7 @@ datafusion = { path = "../datafusion/core", version =
"34.0.0", features = ["avr
datafusion-common = { path = "../datafusion/common" }
dirs = "4.0.0"
env_logger = "0.9"
+futures = "0.3"
mimalloc = { version = "0.1", default-features = false }
object_store = { version = "0.8.0", features = ["aws", "gcp"] }
parking_lot = { version = "0.12" }
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index 8af534cd13..ba9aa2e69a 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -17,6 +17,12 @@
//! Execution functions
+use std::io::prelude::*;
+use std::io::BufReader;
+use std::time::Instant;
+use std::{fs::File, sync::Arc};
+
+use crate::print_format::PrintFormat;
use crate::{
command::{Command, OutputFormat},
helper::{unescape_input, CliHelper},
@@ -26,21 +32,19 @@ use crate::{
},
print_options::{MaxRows, PrintOptions},
};
-use datafusion::common::plan_datafusion_err;
+
+use datafusion::common::{exec_datafusion_err, plan_datafusion_err};
+use datafusion::datasource::listing::ListingTableUrl;
+use datafusion::datasource::physical_plan::is_plan_streaming;
+use datafusion::error::{DataFusionError, Result};
+use datafusion::logical_expr::{CreateExternalTable, DdlStatement, LogicalPlan};
+use datafusion::physical_plan::{collect, execute_stream};
+use datafusion::prelude::SessionContext;
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};
-use datafusion::{
- datasource::listing::ListingTableUrl,
- error::{DataFusionError, Result},
- logical_expr::{CreateExternalTable, DdlStatement},
-};
-use datafusion::{logical_expr::LogicalPlan, prelude::SessionContext};
+
use object_store::ObjectStore;
use rustyline::error::ReadlineError;
use rustyline::Editor;
-use std::io::prelude::*;
-use std::io::BufReader;
-use std::time::Instant;
-use std::{fs::File, sync::Arc};
use url::Url;
/// run and execute SQL statements and commands, against a context with the
given print options
@@ -125,8 +129,6 @@ pub async fn exec_from_repl(
)));
rl.load_history(".history").ok();
- let mut print_options = print_options.clone();
-
loop {
match rl.readline("❯ ") {
Ok(line) if line.starts_with('\\') => {
@@ -138,9 +140,7 @@ pub async fn exec_from_repl(
Command::OutputFormat(subcommand) => {
if let Some(subcommand) = subcommand {
if let Ok(command) =
subcommand.parse::<OutputFormat>() {
- if let Err(e) =
- command.execute(&mut
print_options).await
- {
+ if let Err(e) =
command.execute(print_options).await {
eprintln!("{e}")
}
} else {
@@ -154,7 +154,7 @@ pub async fn exec_from_repl(
}
}
_ => {
- if let Err(e) = cmd.execute(ctx, &mut
print_options).await {
+ if let Err(e) = cmd.execute(ctx,
print_options).await {
eprintln!("{e}")
}
}
@@ -165,7 +165,7 @@ pub async fn exec_from_repl(
}
Ok(line) => {
rl.add_history_entry(line.trim_end())?;
- match exec_and_print(ctx, &print_options, line).await {
+ match exec_and_print(ctx, print_options, line).await {
Ok(_) => {}
Err(err) => eprintln!("{err}"),
}
@@ -198,7 +198,6 @@ async fn exec_and_print(
sql: String,
) -> Result<()> {
let now = Instant::now();
-
let sql = unescape_input(&sql)?;
let task_ctx = ctx.task_ctx();
let dialect = &task_ctx.session_config().options().sql_parser.dialect;
@@ -227,18 +226,24 @@ async fn exec_and_print(
if let LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) = &mut
plan {
create_external_table(ctx, cmd).await?;
}
+
let df = ctx.execute_logical_plan(plan).await?;
- let results = df.collect().await?;
+ let physical_plan = df.create_physical_plan().await?;
- let print_options = if should_ignore_maxrows {
- PrintOptions {
- maxrows: MaxRows::Unlimited,
- ..print_options.clone()
- }
+ if is_plan_streaming(&physical_plan)? {
+ let stream = execute_stream(physical_plan, task_ctx.clone())?;
+ print_options.print_stream(stream, now).await?;
} else {
- print_options.clone()
- };
- print_options.print_batches(&results, now)?;
+ let mut print_options = print_options.clone();
+ if should_ignore_maxrows {
+ print_options.maxrows = MaxRows::Unlimited;
+ }
+ if print_options.format == PrintFormat::Automatic {
+ print_options.format = PrintFormat::Table;
+ }
+ let results = collect(physical_plan, task_ctx.clone()).await?;
+ print_options.print_batches(&results, now)?;
+ }
}
Ok(())
@@ -272,10 +277,7 @@ async fn create_external_table(
.object_store_registry
.get_store(url)
.map_err(|_| {
- DataFusionError::Execution(format!(
- "Unsupported object store scheme: {}",
- scheme
- ))
+ exec_datafusion_err!("Unsupported object store scheme:
{}", scheme)
})?
}
};
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 8b74a797b5..563d172f2c 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -15,7 +15,12 @@
// specific language governing permissions and limitations
// under the License.
-use clap::Parser;
+use std::collections::HashMap;
+use std::env;
+use std::path::Path;
+use std::str::FromStr;
+use std::sync::{Arc, OnceLock};
+
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
use datafusion::execution::memory_pool::{FairSpillPool, GreedyMemoryPool};
@@ -29,12 +34,9 @@ use datafusion_cli::{
print_options::{MaxRows, PrintOptions},
DATAFUSION_CLI_VERSION,
};
+
+use clap::Parser;
use mimalloc::MiMalloc;
-use std::collections::HashMap;
-use std::env;
-use std::path::Path;
-use std::str::FromStr;
-use std::sync::{Arc, OnceLock};
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
@@ -111,7 +113,7 @@ struct Args {
)]
rc: Option<Vec<String>>,
- #[clap(long, arg_enum, default_value_t = PrintFormat::Table)]
+ #[clap(long, arg_enum, default_value_t = PrintFormat::Automatic)]
format: PrintFormat,
#[clap(
@@ -331,9 +333,8 @@ fn extract_memory_pool_size(size: &str) -> Result<usize,
String> {
#[cfg(test)]
mod tests {
- use datafusion::assert_batches_eq;
-
use super::*;
+ use datafusion::assert_batches_eq;
fn assert_conversion(input: &str, expected: Result<usize, String>) {
let result = extract_memory_pool_size(input);
diff --git a/datafusion-cli/src/print_format.rs
b/datafusion-cli/src/print_format.rs
index 0738bf6f9b..ea41856249 100644
--- a/datafusion-cli/src/print_format.rs
+++ b/datafusion-cli/src/print_format.rs
@@ -16,23 +16,27 @@
// under the License.
//! Print format variants
+
+use std::str::FromStr;
+
use crate::print_options::MaxRows;
+
use arrow::csv::writer::WriterBuilder;
use arrow::json::{ArrayWriter, LineDelimitedWriter};
+use arrow::record_batch::RecordBatch;
use arrow::util::pretty::pretty_format_batches_with_options;
-use datafusion::arrow::record_batch::RecordBatch;
use datafusion::common::format::DEFAULT_FORMAT_OPTIONS;
-use datafusion::error::{DataFusionError, Result};
-use std::str::FromStr;
+use datafusion::error::Result;
/// Allow records to be printed in different formats
-#[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone)]
+#[derive(Debug, PartialEq, Eq, clap::ArgEnum, Clone, Copy)]
pub enum PrintFormat {
Csv,
Tsv,
Table,
Json,
NdJson,
+ Automatic,
}
impl FromStr for PrintFormat {
@@ -44,31 +48,44 @@ impl FromStr for PrintFormat {
}
macro_rules! batches_to_json {
- ($WRITER: ident, $batches: expr) => {{
- let mut bytes = vec![];
+ ($WRITER: ident, $writer: expr, $batches: expr) => {{
{
- let mut writer = $WRITER::new(&mut bytes);
- $batches.iter().try_for_each(|batch| writer.write(batch))?;
- writer.finish()?;
+ if !$batches.is_empty() {
+ let mut json_writer = $WRITER::new(&mut *$writer);
+ for batch in $batches {
+ json_writer.write(batch)?;
+ }
+ json_writer.finish()?;
+ json_finish!($WRITER, $writer);
+ }
}
- String::from_utf8(bytes).map_err(|e|
DataFusionError::External(Box::new(e)))?
+ Ok(()) as Result<()>
}};
}
-fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) ->
Result<String> {
- let mut bytes = vec![];
- {
- let builder = WriterBuilder::new()
- .with_header(true)
- .with_delimiter(delimiter);
- let mut writer = builder.build(&mut bytes);
- for batch in batches {
- writer.write(batch)?;
- }
+macro_rules! json_finish {
+ (ArrayWriter, $writer: expr) => {{
+ writeln!($writer)?;
+ }};
+ (LineDelimitedWriter, $writer: expr) => {{}};
+}
+
+fn print_batches_with_sep<W: std::io::Write>(
+ writer: &mut W,
+ batches: &[RecordBatch],
+ delimiter: u8,
+ with_header: bool,
+) -> Result<()> {
+ let builder = WriterBuilder::new()
+ .with_header(with_header)
+ .with_delimiter(delimiter);
+ let mut csv_writer = builder.build(writer);
+
+ for batch in batches {
+ csv_writer.write(batch)?;
}
- let formatted =
- String::from_utf8(bytes).map_err(|e|
DataFusionError::External(Box::new(e)))?;
- Ok(formatted)
+
+ Ok(())
}
fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
@@ -88,97 +105,118 @@ fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
result.join("\n")
}
-fn format_batches_with_maxrows(
+fn format_batches_with_maxrows<W: std::io::Write>(
+ writer: &mut W,
batches: &[RecordBatch],
maxrows: MaxRows,
-) -> Result<String> {
+) -> Result<()> {
match maxrows {
MaxRows::Limited(maxrows) => {
- // Only format enough batches for maxrows
+ // Filter batches to meet the maxrows condition
let mut filtered_batches = Vec::new();
- let mut batches = batches;
- let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
- if row_count > maxrows {
- let mut accumulated_rows = 0;
-
- for batch in batches {
+ let mut row_count: usize = 0;
+ let mut over_limit = false;
+ for batch in batches {
+ if row_count + batch.num_rows() > maxrows {
+ // If adding this batch exceeds maxrows, slice the batch
+ let limit = maxrows - row_count;
+ let sliced_batch = batch.slice(0, limit);
+ filtered_batches.push(sliced_batch);
+ over_limit = true;
+ break;
+ } else {
filtered_batches.push(batch.clone());
- if accumulated_rows + batch.num_rows() > maxrows {
- break;
- }
- accumulated_rows += batch.num_rows();
+ row_count += batch.num_rows();
}
-
- batches = &filtered_batches;
}
- let mut formatted = format!(
- "{}",
- pretty_format_batches_with_options(batches,
&DEFAULT_FORMAT_OPTIONS)?,
- );
-
- if row_count > maxrows {
- formatted = keep_only_maxrows(&formatted, maxrows);
+ let formatted = pretty_format_batches_with_options(
+ &filtered_batches,
+ &DEFAULT_FORMAT_OPTIONS,
+ )?;
+ if over_limit {
+ let mut formatted_str = format!("{}", formatted);
+ formatted_str = keep_only_maxrows(&formatted_str, maxrows);
+ writeln!(writer, "{}", formatted_str)?;
+ } else {
+ writeln!(writer, "{}", formatted)?;
}
-
- Ok(formatted)
}
MaxRows::Unlimited => {
- // maxrows not specified, print all rows
- Ok(format!(
- "{}",
- pretty_format_batches_with_options(batches,
&DEFAULT_FORMAT_OPTIONS)?,
- ))
+ let formatted =
+ pretty_format_batches_with_options(batches,
&DEFAULT_FORMAT_OPTIONS)?;
+ writeln!(writer, "{}", formatted)?;
}
}
+
+ Ok(())
}
impl PrintFormat {
- /// print the batches to stdout using the specified format
- /// `maxrows` option is only used for `Table` format:
- /// If `maxrows` is Some(n), then at most n rows will be displayed
- /// If `maxrows` is None, then every row will be displayed
- pub fn print_batches(&self, batches: &[RecordBatch], maxrows: MaxRows) ->
Result<()> {
- if batches.is_empty() {
+ /// Print the batches to a writer using the specified format
+ pub fn print_batches<W: std::io::Write>(
+ &self,
+ writer: &mut W,
+ batches: &[RecordBatch],
+ maxrows: MaxRows,
+ with_header: bool,
+ ) -> Result<()> {
+ if batches.is_empty() || batches[0].num_rows() == 0 {
return Ok(());
}
match self {
- Self::Csv => println!("{}", print_batches_with_sep(batches,
b',')?),
- Self::Tsv => println!("{}", print_batches_with_sep(batches,
b'\t')?),
+ Self::Csv | Self::Automatic => {
+ print_batches_with_sep(writer, batches, b',', with_header)
+ }
+ Self::Tsv => print_batches_with_sep(writer, batches, b'\t',
with_header),
Self::Table => {
if maxrows == MaxRows::Limited(0) {
return Ok(());
}
- println!("{}", format_batches_with_maxrows(batches, maxrows)?,)
- }
- Self::Json => println!("{}", batches_to_json!(ArrayWriter,
batches)),
- Self::NdJson => {
- println!("{}", batches_to_json!(LineDelimitedWriter, batches))
+ format_batches_with_maxrows(writer, batches, maxrows)
}
+ Self::Json => batches_to_json!(ArrayWriter, writer, batches),
+ Self::NdJson => batches_to_json!(LineDelimitedWriter, writer,
batches),
}
- Ok(())
}
}
#[cfg(test)]
mod tests {
+ use std::io::{Cursor, Read, Write};
+ use std::sync::Arc;
+
use super::*;
+
use arrow::array::Int32Array;
use arrow::datatypes::{DataType, Field, Schema};
- use std::sync::Arc;
+ use datafusion::error::Result;
+
+ fn run_test<F>(batches: &[RecordBatch], test_fn: F) -> Result<String>
+ where
+ F: Fn(&mut Cursor<Vec<u8>>, &[RecordBatch]) -> Result<()>,
+ {
+ let mut buffer = Cursor::new(Vec::new());
+ test_fn(&mut buffer, batches)?;
+ buffer.set_position(0);
+ let mut contents = String::new();
+ buffer.read_to_string(&mut contents)?;
+ Ok(contents)
+ }
#[test]
- fn test_print_batches_with_sep() {
- let batches = vec![];
- assert_eq!("", print_batches_with_sep(&batches, b',').unwrap());
+ fn test_print_batches_with_sep() -> Result<()> {
+ let contents = run_test(&[], |buffer, batches| {
+ print_batches_with_sep(buffer, batches, b',', true)
+ })?;
+ assert_eq!(contents, "");
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]));
-
let batch = RecordBatch::try_new(
schema,
vec![
@@ -186,29 +224,33 @@ mod tests {
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![7, 8, 9])),
],
- )
- .unwrap();
+ )?;
- let batches = vec![batch];
- let r = print_batches_with_sep(&batches, b',').unwrap();
- assert_eq!("a,b,c\n1,4,7\n2,5,8\n3,6,9\n", r);
+ let contents = run_test(&[batch], |buffer, batches| {
+ print_batches_with_sep(buffer, batches, b',', true)
+ })?;
+ assert_eq!(contents, "a,b,c\n1,4,7\n2,5,8\n3,6,9\n");
+
+ Ok(())
}
#[test]
fn test_print_batches_to_json_empty() -> Result<()> {
- let batches = vec![];
- let r = batches_to_json!(ArrayWriter, &batches);
- assert_eq!("", r);
+ let contents = run_test(&[], |buffer, batches| {
+ batches_to_json!(ArrayWriter, buffer, batches)
+ })?;
+ assert_eq!(contents, "");
- let r = batches_to_json!(LineDelimitedWriter, &batches);
- assert_eq!("", r);
+ let contents = run_test(&[], |buffer, batches| {
+ batches_to_json!(LineDelimitedWriter, buffer, batches)
+ })?;
+ assert_eq!(contents, "");
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]));
-
let batch = RecordBatch::try_new(
schema,
vec![
@@ -216,25 +258,29 @@ mod tests {
Arc::new(Int32Array::from(vec![4, 5, 6])),
Arc::new(Int32Array::from(vec![7, 8, 9])),
],
- )
- .unwrap();
-
+ )?;
let batches = vec![batch];
- let r = batches_to_json!(ArrayWriter, &batches);
-
assert_eq!("[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]",
r);
- let r = batches_to_json!(LineDelimitedWriter, &batches);
-
assert_eq!("{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n",
r);
+ let contents = run_test(&batches, |buffer, batches| {
+ batches_to_json!(ArrayWriter, buffer, batches)
+ })?;
+ assert_eq!(contents,
"[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]\n");
+
+ let contents = run_test(&batches, |buffer, batches| {
+ batches_to_json!(LineDelimitedWriter, buffer, batches)
+ })?;
+ assert_eq!(contents,
"{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n");
+
Ok(())
}
#[test]
fn test_format_batches_with_maxrows() -> Result<()> {
let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
-
- let batch =
- RecordBatch::try_new(schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))])
- .unwrap();
+ let batch = RecordBatch::try_new(
+ schema,
+ vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
+ )?;
#[rustfmt::skip]
let all_rows_expected = [
@@ -244,7 +290,7 @@ mod tests {
"| 1 |",
"| 2 |",
"| 3 |",
- "+---+",
+ "+---+\n",
].join("\n");
#[rustfmt::skip]
@@ -256,7 +302,7 @@ mod tests {
"| . |",
"| . |",
"| . |",
- "+---+",
+ "+---+\n",
].join("\n");
#[rustfmt::skip]
@@ -272,26 +318,36 @@ mod tests {
"| . |",
"| . |",
"| . |",
- "+---+",
+ "+---+\n",
].join("\n");
- let no_limit = format_batches_with_maxrows(&[batch.clone()],
MaxRows::Unlimited)?;
- assert_eq!(all_rows_expected, no_limit);
-
- let maxrows_less_than_actual =
- format_batches_with_maxrows(&[batch.clone()],
MaxRows::Limited(1))?;
- assert_eq!(one_row_expected, maxrows_less_than_actual);
- let maxrows_more_than_actual =
- format_batches_with_maxrows(&[batch.clone()],
MaxRows::Limited(5))?;
- assert_eq!(all_rows_expected, maxrows_more_than_actual);
- let maxrows_equals_actual =
- format_batches_with_maxrows(&[batch.clone()],
MaxRows::Limited(3))?;
- assert_eq!(all_rows_expected, maxrows_equals_actual);
- let multi_batches = format_batches_with_maxrows(
+ let no_limit = run_test(&[batch.clone()], |buffer, batches| {
+ format_batches_with_maxrows(buffer, batches, MaxRows::Unlimited)
+ })?;
+ assert_eq!(no_limit, all_rows_expected);
+
+ let maxrows_less_than_actual = run_test(&[batch.clone()], |buffer,
batches| {
+ format_batches_with_maxrows(buffer, batches, MaxRows::Limited(1))
+ })?;
+ assert_eq!(maxrows_less_than_actual, one_row_expected);
+
+ let maxrows_more_than_actual = run_test(&[batch.clone()], |buffer,
batches| {
+ format_batches_with_maxrows(buffer, batches, MaxRows::Limited(5))
+ })?;
+ assert_eq!(maxrows_more_than_actual, all_rows_expected);
+
+ let maxrows_equals_actual = run_test(&[batch.clone()], |buffer,
batches| {
+ format_batches_with_maxrows(buffer, batches, MaxRows::Limited(3))
+ })?;
+ assert_eq!(maxrows_equals_actual, all_rows_expected);
+
+ let multi_batches = run_test(
&[batch.clone(), batch.clone(), batch.clone()],
- MaxRows::Limited(5),
+ |buffer, batches| {
+ format_batches_with_maxrows(buffer, batches,
MaxRows::Limited(5))
+ },
)?;
- assert_eq!(multi_batches_expected, multi_batches);
+ assert_eq!(multi_batches, multi_batches_expected);
Ok(())
}
diff --git a/datafusion-cli/src/print_options.rs
b/datafusion-cli/src/print_options.rs
index 0a6c8d4c36..b8594352b5 100644
--- a/datafusion-cli/src/print_options.rs
+++ b/datafusion-cli/src/print_options.rs
@@ -15,13 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-use crate::print_format::PrintFormat;
-use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::error::Result;
use std::fmt::{Display, Formatter};
+use std::io::Write;
+use std::pin::Pin;
use std::str::FromStr;
use std::time::Instant;
+use crate::print_format::PrintFormat;
+
+use arrow::record_batch::RecordBatch;
+use datafusion::common::DataFusionError;
+use datafusion::error::Result;
+use datafusion::physical_plan::RecordBatchStream;
+
+use futures::StreamExt;
+
#[derive(Debug, Clone, PartialEq, Copy)]
pub enum MaxRows {
/// show all rows in the output
@@ -85,20 +93,70 @@ fn get_timing_info_str(
}
impl PrintOptions {
- /// print the batches to stdout using the specified format
+ /// Print the batches to stdout using the specified format
pub fn print_batches(
&self,
batches: &[RecordBatch],
query_start_time: Instant,
) -> Result<()> {
+ let stdout = std::io::stdout();
+ let mut writer = stdout.lock();
+
+ self.format
+ .print_batches(&mut writer, batches, self.maxrows, true)?;
+
let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
- // Elapsed time should not count time for printing batches
- let timing_info = get_timing_info_str(row_count, self.maxrows,
query_start_time);
+ let timing_info = get_timing_info_str(
+ row_count,
+ if self.format == PrintFormat::Table {
+ self.maxrows
+ } else {
+ MaxRows::Unlimited
+ },
+ query_start_time,
+ );
+
+ if !self.quiet {
+ writeln!(writer, "{timing_info}")?;
+ }
+
+ Ok(())
+ }
+
+ /// Print the stream to stdout using the specified format
+ pub async fn print_stream(
+ &self,
+ mut stream: Pin<Box<dyn RecordBatchStream>>,
+ query_start_time: Instant,
+ ) -> Result<()> {
+ if self.format == PrintFormat::Table {
+ return Err(DataFusionError::External(
+ "PrintFormat::Table is not implemented".to_string().into(),
+ ));
+ };
+
+ let stdout = std::io::stdout();
+ let mut writer = stdout.lock();
+
+ let mut row_count = 0_usize;
+ let mut with_header = true;
+
+ while let Some(Ok(batch)) = stream.next().await {
+ row_count += batch.num_rows();
+ self.format.print_batches(
+ &mut writer,
+ &[batch],
+ MaxRows::Unlimited,
+ with_header,
+ )?;
+ with_header = false;
+ }
- self.format.print_batches(batches, self.maxrows)?;
+ let timing_info =
+ get_timing_info_str(row_count, MaxRows::Unlimited,
query_start_time);
if !self.quiet {
- println!("{timing_info}");
+ writeln!(writer, "{timing_info}")?;
}
Ok(())
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs
b/datafusion/core/src/datasource/physical_plan/mod.rs
index 4a6ebeab09..5583991355 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -69,6 +69,7 @@ use arrow::{
use datafusion_common::{file_options::FileTypeWriterOptions, plan_err};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalSortExpr;
+use datafusion_physical_plan::ExecutionPlan;
use log::debug;
use object_store::path::Path;
@@ -507,6 +508,20 @@ fn get_projected_output_ordering(
all_orderings
}
+/// Get output (un)boundedness information for the given `plan`.
+pub fn is_plan_streaming(plan: &Arc<dyn ExecutionPlan>) -> Result<bool> {
+ if plan.children().is_empty() {
+ plan.unbounded_output(&[])
+ } else {
+ let children_unbounded_output = plan
+ .children()
+ .iter()
+ .map(is_plan_streaming)
+ .collect::<Result<Vec<_>>>();
+ plan.unbounded_output(&children_unbounded_output?)
+ }
+}
+
#[cfg(test)]
mod tests {
use arrow_array::cast::AsArray;