This is an automated email from the ASF dual-hosted git repository.
ytyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 83487e3c6b feat: Improve datafusion-cli memory usage and considering
reserve mem… (#14766)
83487e3c6b is described below
commit 83487e3c6bf60312e94c7cd68b0c100cdd82f21e
Author: Qi Zhu <[email protected]>
AuthorDate: Fri Feb 21 11:57:39 2025 +0800
feat: Improve datafusion-cli memory usage and considering reserve mem…
(#14766)
* feat: Improve datafusion-cli memory usage and considering reserve memory
for the result batches
* Address new comments
* Address new comments
* fix test
* fix test
* Address comments
* Fix doc
* Fix row count showing
* Fix fmt
* fix corner case
* remove unused code
---
datafusion-cli/src/command.rs | 4 +++-
datafusion-cli/src/exec.rs | 43 +++++++++++++++++++++++++++++--------
datafusion-cli/src/print_options.rs | 2 +-
3 files changed, 38 insertions(+), 11 deletions(-)
diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs
index f0eb58a233..fc7d1a2617 100644
--- a/datafusion-cli/src/command.rs
+++ b/datafusion-cli/src/command.rs
@@ -62,7 +62,9 @@ impl Command {
Self::Help => {
let now = Instant::now();
let command_batch = all_commands_info();
- print_options.print_batches(command_batch.schema(),
&[command_batch], now)
+ let schema = command_batch.schema();
+ let num_rows = command_batch.num_rows();
+ print_options.print_batches(schema, &[command_batch], now,
num_rows)
}
Self::ListTables => {
exec_and_print(ctx, print_options, "SHOW TABLES".into()).await
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index a4f154b2de..84664794b7 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -17,11 +17,6 @@
//! Execution functions
-use std::collections::HashMap;
-use std::fs::File;
-use std::io::prelude::*;
-use std::io::BufReader;
-
use crate::cli_context::CliSessionContext;
use crate::helper::split_from_semicolon;
use crate::print_format::PrintFormat;
@@ -31,6 +26,11 @@ use crate::{
object_storage::get_object_store,
print_options::{MaxRows, PrintOptions},
};
+use futures::StreamExt;
+use std::collections::HashMap;
+use std::fs::File;
+use std::io::prelude::*;
+use std::io::BufReader;
use datafusion::common::instant::Instant;
use datafusion::common::{plan_datafusion_err, plan_err};
@@ -39,10 +39,12 @@ use datafusion::datasource::listing::ListingTableUrl;
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::{DdlStatement, LogicalPlan};
use datafusion::physical_plan::execution_plan::EmissionType;
-use datafusion::physical_plan::{collect, execute_stream,
ExecutionPlanProperties};
+use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
use datafusion::sql::parser::{DFParser, Statement};
use datafusion::sql::sqlparser::dialect::dialect_from_str;
+use datafusion::execution::memory_pool::MemoryConsumer;
+use datafusion::physical_plan::spill::get_record_batch_memory_size;
use datafusion::sql::sqlparser;
use rustyline::error::ReadlineError;
use rustyline::Editor;
@@ -235,6 +237,10 @@ pub(super) async fn exec_and_print(
let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;
+ // Track memory usage for the query result if it's bounded
+ let mut reservation =
+
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());
+
if physical_plan.boundedness().is_unbounded() {
if physical_plan.pipeline_behavior() == EmissionType::Final {
return plan_err!(
@@ -247,10 +253,29 @@ pub(super) async fn exec_and_print(
let stream = execute_stream(physical_plan, task_ctx.clone())?;
print_options.print_stream(stream, now).await?;
} else {
- // Bounded stream; collected results are printed after all input
consumed.
+ // Bounded stream; collected results size is limited by the
maxrows option
let schema = physical_plan.schema();
- let results = collect(physical_plan, task_ctx.clone()).await?;
- adjusted.into_inner().print_batches(schema, &results, now)?;
+ let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
+ let mut results = vec![];
+ let mut row_count = 0_usize;
+ while let Some(batch) = stream.next().await {
+ let batch = batch?;
+ let curr_num_rows = batch.num_rows();
+ if let MaxRows::Limited(max_rows) = print_options.maxrows {
+ // Stop collecting results if the number of rows exceeds
the limit
+ // results batch should include the last batch that
exceeds the limit
+ if row_count < max_rows + curr_num_rows {
+ // Try to grow the reservation to accommodate the
batch in memory
+
reservation.try_grow(get_record_batch_memory_size(&batch))?;
+ results.push(batch);
+ }
+ }
+ row_count += curr_num_rows;
+ }
+ adjusted
+ .into_inner()
+ .print_batches(schema, &results, now, row_count)?;
+ reservation.free();
}
}
diff --git a/datafusion-cli/src/print_options.rs
b/datafusion-cli/src/print_options.rs
index e80cc55663..9557e783e8 100644
--- a/datafusion-cli/src/print_options.rs
+++ b/datafusion-cli/src/print_options.rs
@@ -102,6 +102,7 @@ impl PrintOptions {
schema: SchemaRef,
batches: &[RecordBatch],
query_start_time: Instant,
+ row_count: usize,
) -> Result<()> {
let stdout = std::io::stdout();
let mut writer = stdout.lock();
@@ -109,7 +110,6 @@ impl PrintOptions {
self.format
.print_batches(&mut writer, schema, batches, self.maxrows, true)?;
- let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
let formatted_exec_details = get_execution_details_formatted(
row_count,
if self.format == PrintFormat::Table {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]