This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 963680dd23 feat: Better large output display in datafusion-cli with
--maxrows option (#7617)
963680dd23 is described below
commit 963680dd23f952c55ebf8ff973f8d8fa7d0070a1
Author: Yongting You <[email protected]>
AuthorDate: Fri Sep 22 05:30:41 2023 -0700
feat: Better large output display in datafusion-cli with --maxrows option
(#7617)
* Better large output display for CLI
* review comments
---
datafusion-cli/src/exec.rs | 21 ++++-
datafusion-cli/src/main.rs | 13 +++-
datafusion-cli/src/print_format.rs | 148 ++++++++++++++++++++++++++++++++++--
datafusion-cli/src/print_options.rs | 86 ++++++++++++++++-----
docs/source/user-guide/cli.md | 47 +++++++++---
5 files changed, 280 insertions(+), 35 deletions(-)
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index 7416b73971..100d7bce44 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -24,7 +24,7 @@ use crate::{
get_gcs_object_store_builder, get_oss_object_store_builder,
get_s3_object_store_builder,
},
- print_options::PrintOptions,
+ print_options::{MaxRows, PrintOptions},
};
use datafusion::sql::{parser::DFParser, sqlparser::dialect::dialect_from_str};
use datafusion::{
@@ -211,6 +211,15 @@ async fn exec_and_print(
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
for statement in statements {
let plan = ctx.state().statement_to_plan(statement).await?;
+
+ // For plans like `Explain` ignore `MaxRows` option and always display
all rows
+ let should_ignore_maxrows = matches!(
+ plan,
+ LogicalPlan::Explain(_)
+ | LogicalPlan::DescribeTable(_)
+ | LogicalPlan::Analyze(_)
+ );
+
let df = match &plan {
LogicalPlan::Ddl(DdlStatement::CreateExternalTable(cmd)) => {
create_external_table(ctx, cmd).await?;
@@ -220,8 +229,18 @@ async fn exec_and_print(
};
let results = df.collect().await?;
+
+ let print_options = if should_ignore_maxrows {
+ PrintOptions {
+ maxrows: MaxRows::Unlimited,
+ ..print_options.clone()
+ }
+ } else {
+ print_options.clone()
+ };
print_options.print_batches(&results, now)?;
}
+
Ok(())
}
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index 25f4126084..33a1caeb1b 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -23,7 +23,10 @@ use datafusion::execution::runtime_env::{RuntimeConfig,
RuntimeEnv};
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicFileCatalog;
use datafusion_cli::{
- exec, print_format::PrintFormat, print_options::PrintOptions,
DATAFUSION_CLI_VERSION,
+ exec,
+ print_format::PrintFormat,
+ print_options::{MaxRows, PrintOptions},
+ DATAFUSION_CLI_VERSION,
};
use mimalloc::MiMalloc;
use std::collections::HashMap;
@@ -122,6 +125,13 @@ struct Args {
help = "Specify the memory pool type 'greedy' or 'fair', default to
'greedy'"
)]
mem_pool_type: Option<PoolType>,
+
+ #[clap(
+ long,
+ help = "The max number of rows to display for 'Table'
format\n[default: 40] [possible values: numbers(0/10/...), inf(no limit)]",
+ default_value = "40"
+ )]
+ maxrows: MaxRows,
}
#[tokio::main]
@@ -179,6 +189,7 @@ pub async fn main() -> Result<()> {
let mut print_options = PrintOptions {
format: args.format,
quiet: args.quiet,
+ maxrows: args.maxrows,
};
let commands = args.command;
diff --git a/datafusion-cli/src/print_format.rs
b/datafusion-cli/src/print_format.rs
index 8d8c0e4a39..e2994bc140 100644
--- a/datafusion-cli/src/print_format.rs
+++ b/datafusion-cli/src/print_format.rs
@@ -16,6 +16,7 @@
// under the License.
//! Print format variants
+use crate::print_options::MaxRows;
use arrow::csv::writer::WriterBuilder;
use arrow::json::{ArrayWriter, LineDelimitedWriter};
use arrow::util::pretty::pretty_format_batches_with_options;
@@ -70,17 +71,86 @@ fn print_batches_with_sep(batches: &[RecordBatch],
delimiter: u8) -> Result<Stri
Ok(formatted)
}
+fn keep_only_maxrows(s: &str, maxrows: usize) -> String {
+ let lines: Vec<String> = s.lines().map(String::from).collect();
+
+ assert!(lines.len() >= maxrows + 4); // 4 lines for top and bottom border
+
+ let last_line = &lines[lines.len() - 1]; // bottom border line
+
+ let spaces = last_line.len().saturating_sub(4);
+ let dotted_line = format!("| .{:<spaces$}|", "", spaces = spaces);
+
+ let mut result = lines[0..(maxrows + 3)].to_vec(); // Keep top border and
`maxrows` lines
+ result.extend(vec![dotted_line; 3]); // Append ... lines
+ result.push(last_line.clone());
+
+ result.join("\n")
+}
+
+fn format_batches_with_maxrows(
+ batches: &[RecordBatch],
+ maxrows: MaxRows,
+) -> Result<String> {
+ match maxrows {
+ MaxRows::Limited(maxrows) => {
+ // Only format enough batches for maxrows
+ let mut filtered_batches = Vec::new();
+ let mut batches = batches;
+ let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
+ if row_count > maxrows {
+ let mut accumulated_rows = 0;
+
+ for batch in batches {
+ filtered_batches.push(batch.clone());
+ if accumulated_rows + batch.num_rows() > maxrows {
+ break;
+ }
+ accumulated_rows += batch.num_rows();
+ }
+
+ batches = &filtered_batches;
+ }
+
+ let mut formatted = format!(
+ "{}",
+ pretty_format_batches_with_options(batches,
&DEFAULT_FORMAT_OPTIONS)?,
+ );
+
+ if row_count > maxrows {
+ formatted = keep_only_maxrows(&formatted, maxrows);
+ }
+
+ Ok(formatted)
+ }
+ MaxRows::Unlimited => {
+ // maxrows not specified, print all rows
+ Ok(format!(
+ "{}",
+ pretty_format_batches_with_options(batches,
&DEFAULT_FORMAT_OPTIONS)?,
+ ))
+ }
+ }
+}
+
impl PrintFormat {
/// print the batches to stdout using the specified format
- pub fn print_batches(&self, batches: &[RecordBatch]) -> Result<()> {
+ /// `maxrows` option is only used for `Table` format:
+ /// If `maxrows` is Some(n), then at most n rows will be displayed
+ /// If `maxrows` is None, then every row will be displayed
+ pub fn print_batches(&self, batches: &[RecordBatch], maxrows: MaxRows) ->
Result<()> {
+ if batches.is_empty() {
+ return Ok(());
+ }
+
match self {
Self::Csv => println!("{}", print_batches_with_sep(batches,
b',')?),
Self::Tsv => println!("{}", print_batches_with_sep(batches,
b'\t')?),
Self::Table => {
- println!(
- "{}",
- pretty_format_batches_with_options(batches,
&DEFAULT_FORMAT_OPTIONS)?
- )
+ if maxrows == MaxRows::Limited(0) {
+ return Ok(());
+ }
+ println!("{}", format_batches_with_maxrows(batches, maxrows)?,)
}
Self::Json => println!("{}", batches_to_json!(ArrayWriter,
batches)),
Self::NdJson => {
@@ -157,4 +227,72 @@ mod tests {
assert_eq!("{\"a\":1,\"b\":4,\"c\":7}\n{\"a\":2,\"b\":5,\"c\":8}\n{\"a\":3,\"b\":6,\"c\":9}\n",
r);
Ok(())
}
+
+ #[test]
+ fn test_format_batches_with_maxrows() -> Result<()> {
+ let schema = Arc::new(Schema::new(vec![Field::new("a",
DataType::Int32, false)]));
+
+ let batch =
+ RecordBatch::try_new(schema,
vec![Arc::new(Int32Array::from(vec![1, 2, 3]))])
+ .unwrap();
+
+ #[rustfmt::skip]
+ let all_rows_expected = [
+ "+---+",
+ "| a |",
+ "+---+",
+ "| 1 |",
+ "| 2 |",
+ "| 3 |",
+ "+---+",
+ ].join("\n");
+
+ #[rustfmt::skip]
+ let one_row_expected = [
+ "+---+",
+ "| a |",
+ "+---+",
+ "| 1 |",
+ "| . |",
+ "| . |",
+ "| . |",
+ "+---+",
+ ].join("\n");
+
+ #[rustfmt::skip]
+ let multi_batches_expected = [
+ "+---+",
+ "| a |",
+ "+---+",
+ "| 1 |",
+ "| 2 |",
+ "| 3 |",
+ "| 1 |",
+ "| 2 |",
+ "| . |",
+ "| . |",
+ "| . |",
+ "+---+",
+ ].join("\n");
+
+ let no_limit = format_batches_with_maxrows(&[batch.clone()],
MaxRows::Unlimited)?;
+ assert_eq!(all_rows_expected, no_limit);
+
+ let maxrows_less_than_actual =
+ format_batches_with_maxrows(&[batch.clone()],
MaxRows::Limited(1))?;
+ assert_eq!(one_row_expected, maxrows_less_than_actual);
+ let maxrows_more_than_actual =
+ format_batches_with_maxrows(&[batch.clone()],
MaxRows::Limited(5))?;
+ assert_eq!(all_rows_expected, maxrows_more_than_actual);
+ let maxrows_equals_actual =
+ format_batches_with_maxrows(&[batch.clone()],
MaxRows::Limited(3))?;
+ assert_eq!(all_rows_expected, maxrows_equals_actual);
+ let multi_batches = format_batches_with_maxrows(
+ &[batch.clone(), batch.clone(), batch.clone()],
+ MaxRows::Limited(5),
+ )?;
+ assert_eq!(multi_batches_expected, multi_batches);
+
+ Ok(())
+ }
}
diff --git a/datafusion-cli/src/print_options.rs
b/datafusion-cli/src/print_options.rs
index 33ba7ef086..0a6c8d4c36 100644
--- a/datafusion-cli/src/print_options.rs
+++ b/datafusion-cli/src/print_options.rs
@@ -18,37 +18,89 @@
use crate::print_format::PrintFormat;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::Result;
+use std::fmt::{Display, Formatter};
+use std::str::FromStr;
use std::time::Instant;
+#[derive(Debug, Clone, PartialEq, Copy)]
+pub enum MaxRows {
+ /// show all rows in the output
+ Unlimited,
+ /// Only show n rows
+ Limited(usize),
+}
+
+impl FromStr for MaxRows {
+ type Err = String;
+
+ fn from_str(maxrows: &str) -> Result<Self, Self::Err> {
+ if maxrows.to_lowercase() == "inf"
+ || maxrows.to_lowercase() == "infinite"
+ || maxrows.to_lowercase() == "none"
+ {
+ Ok(Self::Unlimited)
+ } else {
+ match maxrows.parse::<usize>() {
+ Ok(nrows) => Ok(Self::Limited(nrows)),
+ _ => Err(format!("Invalid maxrows {}. Valid inputs are natural
numbers or \'none\', \'inf\', or \'infinite\' for no limit.", maxrows)),
+ }
+ }
+ }
+}
+
+impl Display for MaxRows {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ Self::Unlimited => write!(f, "unlimited"),
+ Self::Limited(max_rows) => write!(f, "at most {max_rows}"),
+ }
+ }
+}
+
#[derive(Debug, Clone)]
pub struct PrintOptions {
pub format: PrintFormat,
pub quiet: bool,
+ pub maxrows: MaxRows,
}
-fn print_timing_info(row_count: usize, now: Instant) {
- println!(
- "{} {} in set. Query took {:.3} seconds.\n",
+fn get_timing_info_str(
+ row_count: usize,
+ maxrows: MaxRows,
+ query_start_time: Instant,
+) -> String {
+ let row_word = if row_count == 1 { "row" } else { "rows" };
+ let nrows_shown_msg = match maxrows {
+ MaxRows::Limited(nrows) if nrows < row_count => format!(" ({} shown)",
nrows),
+ _ => String::new(),
+ };
+
+ format!(
+ "{} {} in set{}. Query took {:.3} seconds.\n",
row_count,
- if row_count == 1 { "row" } else { "rows" },
- now.elapsed().as_secs_f64()
- );
+ row_word,
+ nrows_shown_msg,
+ query_start_time.elapsed().as_secs_f64()
+ )
}
impl PrintOptions {
/// print the batches to stdout using the specified format
- pub fn print_batches(&self, batches: &[RecordBatch], now: Instant) ->
Result<()> {
- if batches.is_empty() {
- if !self.quiet {
- print_timing_info(0, now);
- }
- } else {
- self.format.print_batches(batches)?;
- if !self.quiet {
- let row_count: usize = batches.iter().map(|b|
b.num_rows()).sum();
- print_timing_info(row_count, now);
- }
+ pub fn print_batches(
+ &self,
+ batches: &[RecordBatch],
+ query_start_time: Instant,
+ ) -> Result<()> {
+ let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
+ // Elapsed time should not count time for printing batches
+ let timing_info = get_timing_info_str(row_count, self.maxrows,
query_start_time);
+
+ self.format.print_batches(batches, self.maxrows)?;
+
+ if !self.quiet {
+ println!("{timing_info}");
}
+
Ok(())
}
}
diff --git a/docs/source/user-guide/cli.md b/docs/source/user-guide/cli.md
index e1f332baf3..05b4165e61 100644
--- a/docs/source/user-guide/cli.md
+++ b/docs/source/user-guide/cli.md
@@ -75,17 +75,42 @@ USAGE:
datafusion-cli [OPTIONS]
OPTIONS:
- -c, --batch-size <BATCH_SIZE> The batch size of each query, or
use DataFusion default
- -f, --file <FILE>... Execute commands from file(s),
then exit
- --format <FORMAT> [default: table] [possible values:
csv, tsv, table, json,
- nd-json]
- -h, --help Print help information
- -m, --memory-limit <MEMORY_LIMIT> The memory pool limitation (e.g.
'10g'), default to None (no limit)
- --mem-pool-type <MEM_POOL_TYPE> Specify the memory pool type
'greedy' or 'fair', default to 'greedy'
- -p, --data-path <DATA_PATH> Path to your data, default to
current directory
- -q, --quiet Reduce printing other than the
results and work quietly
- -r, --rc <RC>... Run the provided files on startup
instead of ~/.datafusionrc
- -V, --version Print version information
+ -b, --batch-size <BATCH_SIZE>
+ The batch size of each query, or use DataFusion default
+
+ -c, --command <COMMAND>...
+ Execute the given command string(s), then exit
+
+ -f, --file <FILE>...
+ Execute commands from file(s), then exit
+
+ --format <FORMAT>
+ [default: table] [possible values: csv, tsv, table, json, nd-json]
+
+ -h, --help
+ Print help information
+
+ -m, --memory-limit <MEMORY_LIMIT>
+ The memory pool limitation (e.g. '10g'), default to None (no limit)
+
+ --maxrows <MAXROWS>
+ The max number of rows to display for 'Table' format
+ [default: 40] [possible values: numbers(0/10/...), inf(no limit)]
+
+ --mem-pool-type <MEM_POOL_TYPE>
+ Specify the memory pool type 'greedy' or 'fair', default to
'greedy'
+
+ -p, --data-path <DATA_PATH>
+ Path to your data, default to current directory
+
+ -q, --quiet
+ Reduce printing other than the results and work quietly
+
+ -r, --rc <RC>...
+ Run the provided files on startup instead of ~/.datafusionrc
+
+ -V, --version
+ Print version information
```
## Querying data from the files directly