This is an automated email from the ASF dual-hosted git repository.

ytyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 83487e3c6b feat: Improve datafusion-cli memory usage and considering 
reserve mem… (#14766)
83487e3c6b is described below

commit 83487e3c6bf60312e94c7cd68b0c100cdd82f21e
Author: Qi Zhu <[email protected]>
AuthorDate: Fri Feb 21 11:57:39 2025 +0800

    feat: Improve datafusion-cli memory usage and considering reserve mem… 
(#14766)
    
    * feat: Improve datafusion-cli memory usage and considering reserve memory 
for the result batches
    
    * Address new comments
    
    * Address new comments
    
    * fix test
    
    * fix test
    
    * Address comments
    
    * Fix doc
    
    * Fix row count showing
    
    * Fix fmt
    
    * fix corner case
    
    * remove unused code
---
 datafusion-cli/src/command.rs       |  4 +++-
 datafusion-cli/src/exec.rs          | 43 +++++++++++++++++++++++++++++--------
 datafusion-cli/src/print_options.rs |  2 +-
 3 files changed, 38 insertions(+), 11 deletions(-)

diff --git a/datafusion-cli/src/command.rs b/datafusion-cli/src/command.rs
index f0eb58a233..fc7d1a2617 100644
--- a/datafusion-cli/src/command.rs
+++ b/datafusion-cli/src/command.rs
@@ -62,7 +62,9 @@ impl Command {
             Self::Help => {
                 let now = Instant::now();
                 let command_batch = all_commands_info();
-                print_options.print_batches(command_batch.schema(), 
&[command_batch], now)
+                let schema = command_batch.schema();
+                let num_rows = command_batch.num_rows();
+                print_options.print_batches(schema, &[command_batch], now, 
num_rows)
             }
             Self::ListTables => {
                 exec_and_print(ctx, print_options, "SHOW TABLES".into()).await
diff --git a/datafusion-cli/src/exec.rs b/datafusion-cli/src/exec.rs
index a4f154b2de..84664794b7 100644
--- a/datafusion-cli/src/exec.rs
+++ b/datafusion-cli/src/exec.rs
@@ -17,11 +17,6 @@
 
 //! Execution functions
 
-use std::collections::HashMap;
-use std::fs::File;
-use std::io::prelude::*;
-use std::io::BufReader;
-
 use crate::cli_context::CliSessionContext;
 use crate::helper::split_from_semicolon;
 use crate::print_format::PrintFormat;
@@ -31,6 +26,11 @@ use crate::{
     object_storage::get_object_store,
     print_options::{MaxRows, PrintOptions},
 };
+use futures::StreamExt;
+use std::collections::HashMap;
+use std::fs::File;
+use std::io::prelude::*;
+use std::io::BufReader;
 
 use datafusion::common::instant::Instant;
 use datafusion::common::{plan_datafusion_err, plan_err};
@@ -39,10 +39,12 @@ use datafusion::datasource::listing::ListingTableUrl;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::logical_expr::{DdlStatement, LogicalPlan};
 use datafusion::physical_plan::execution_plan::EmissionType;
-use datafusion::physical_plan::{collect, execute_stream, 
ExecutionPlanProperties};
+use datafusion::physical_plan::{execute_stream, ExecutionPlanProperties};
 use datafusion::sql::parser::{DFParser, Statement};
 use datafusion::sql::sqlparser::dialect::dialect_from_str;
 
+use datafusion::execution::memory_pool::MemoryConsumer;
+use datafusion::physical_plan::spill::get_record_batch_memory_size;
 use datafusion::sql::sqlparser;
 use rustyline::error::ReadlineError;
 use rustyline::Editor;
@@ -235,6 +237,10 @@ pub(super) async fn exec_and_print(
         let df = ctx.execute_logical_plan(plan).await?;
         let physical_plan = df.create_physical_plan().await?;
 
+        // Track memory usage for the query result if it's bounded
+        let mut reservation =
+            
MemoryConsumer::new("DataFusion-Cli").register(task_ctx.memory_pool());
+
         if physical_plan.boundedness().is_unbounded() {
             if physical_plan.pipeline_behavior() == EmissionType::Final {
                 return plan_err!(
@@ -247,10 +253,29 @@ pub(super) async fn exec_and_print(
             let stream = execute_stream(physical_plan, task_ctx.clone())?;
             print_options.print_stream(stream, now).await?;
         } else {
-            // Bounded stream; collected results are printed after all input 
consumed.
+            // Bounded stream; collected results size is limited by the 
maxrows option
             let schema = physical_plan.schema();
-            let results = collect(physical_plan, task_ctx.clone()).await?;
-            adjusted.into_inner().print_batches(schema, &results, now)?;
+            let mut stream = execute_stream(physical_plan, task_ctx.clone())?;
+            let mut results = vec![];
+            let mut row_count = 0_usize;
+            while let Some(batch) = stream.next().await {
+                let batch = batch?;
+                let curr_num_rows = batch.num_rows();
+                if let MaxRows::Limited(max_rows) = print_options.maxrows {
+                    // Stop collecting results if the number of rows exceeds 
the limit
+                    // results batch should include the last batch that 
exceeds the limit
+                    if row_count < max_rows + curr_num_rows {
+                        // Try to grow the reservation to accommodate the 
batch in memory
+                        
reservation.try_grow(get_record_batch_memory_size(&batch))?;
+                        results.push(batch);
+                    }
+                }
+                row_count += curr_num_rows;
+            }
+            adjusted
+                .into_inner()
+                .print_batches(schema, &results, now, row_count)?;
+            reservation.free();
         }
     }
 
diff --git a/datafusion-cli/src/print_options.rs 
b/datafusion-cli/src/print_options.rs
index e80cc55663..9557e783e8 100644
--- a/datafusion-cli/src/print_options.rs
+++ b/datafusion-cli/src/print_options.rs
@@ -102,6 +102,7 @@ impl PrintOptions {
         schema: SchemaRef,
         batches: &[RecordBatch],
         query_start_time: Instant,
+        row_count: usize,
     ) -> Result<()> {
         let stdout = std::io::stdout();
         let mut writer = stdout.lock();
@@ -109,7 +110,6 @@ impl PrintOptions {
         self.format
             .print_batches(&mut writer, schema, batches, self.maxrows, true)?;
 
-        let row_count: usize = batches.iter().map(|b| b.num_rows()).sum();
         let formatted_exec_details = get_execution_details_formatted(
             row_count,
             if self.format == PrintFormat::Table {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to