alamb commented on code in PR #17021:
URL: https://github.com/apache/datafusion/pull/17021#discussion_r2289199544


##########
datafusion/core/src/dataframe/mod.rs:
##########
@@ -1373,7 +1373,8 @@ impl DataFrame {
     pub async fn collect(self) -> Result<Vec<RecordBatch>> {
         let task_ctx = Arc::new(self.task_ctx());
         let plan = self.create_physical_plan().await?;
-        collect(plan, task_ctx).await
+        let batches = collect(plan, task_ctx).await?;

Review Comment:
   what is the purpose of these changes?



##########
datafusion-cli/tests/snapshots/aws_options.snap:
##########
@@ -12,6 +12,11 @@ exit_code: 0
 0 row(s) fetched. 
 [ELAPSED]
 
+Peak memory usage: 0.0 B

Review Comment:
   this change shows the memory profiling is enabled by default. I think memory 
profiling would be more useful if we had it off by default and then enabled 
when \profiling was on



##########
datafusion/core/src/execution/session_state.rs:
##########
@@ -16,69 +16,76 @@
 // under the License.
 
 //! [`SessionState`]: information required to run queries in a session
-
-use std::any::Any;
-use std::collections::hash_map::Entry;
-use std::collections::{HashMap, HashSet};
-use std::fmt::Debug;
-use std::sync::Arc;
-
-use crate::catalog::{CatalogProviderList, SchemaProvider, 
TableProviderFactory};
-use crate::datasource::cte_worktable::CteWorkTable;
-use crate::datasource::file_format::{format_as_file_type, FileFormatFactory};
-use crate::datasource::provider_as_source;
-use crate::execution::context::{EmptySerializerRegistry, FunctionFactory, 
QueryPlanner};
-use crate::execution::SessionStateDefaults;
-use crate::physical_planner::{DefaultPhysicalPlanner, PhysicalPlanner};
+use crate::{

Review Comment:
   I think these changes lead to uncessary churn and make the code base more 
inconsistent -- can we please pull them out into their own PR if you want to 
refactor the `use` statements?



##########
datafusion-cli/src/cli_context.rs:
##########
@@ -93,6 +116,120 @@ impl CliSessionContext for SessionContext {
         &self,
         plan: LogicalPlan,
     ) -> Result<DataFrame, DataFusionError> {
-        self.execute_logical_plan(plan).await
+        SessionContext::execute_logical_plan(self, plan).await
+    }
+}
+
+/// Session context used by the CLI with memory profiling support.
+pub struct ReplSessionContext {
+    ctx: SessionContext,
+    memory_profiling: AtomicBool,
+    tracked_memory_pool: Option<Arc<dyn TrackedPool>>,
+}
+
+impl ReplSessionContext {
+    pub fn new(
+        ctx: SessionContext,
+        base_memory_pool: Arc<dyn MemoryPool>,
+        top_memory_consumers: usize,
+    ) -> Self {
+        let tracked_memory_pool = if top_memory_consumers > 0 {

Review Comment:
   this seems like it is redundant with the top level pool that is installed 
with the CLI (which also may be a tracked pool)



##########
datafusion-cli/README.md:
##########
@@ -30,3 +30,31 @@ DataFusion CLI (`datafusion-cli`) is a small command line 
utility that runs SQL
 ## Where can I find more information?
 
 See the [`datafusion-cli` 
documentation](https://datafusion.apache.org/user-guide/cli/index.html) for 
further information.
+
+## Memory Profiling

Review Comment:
   This seems like it should be in the CLI documentation rather than this README



##########
datafusion-cli/src/command.rs:
##########
@@ -206,11 +240,30 @@ impl FromStr for Command {
                 Self::OutputFormat(Some(subcommand.to_string()))
             }
             ("pset", None) => Self::OutputFormat(None),
+            ("memory_profiling", sub) => {
+                let sub = match sub {
+                    Some(s) => 
Some(s.parse::<MemoryProfilingCommand>().map_err(|_| ())?),
+                    None => None,
+                };
+                Self::MemoryProfiling(sub)
+            }
             _ => return Err(()),
         })
     }
 }
 
+impl FromStr for MemoryProfilingCommand {

Review Comment:
   this eems a bit left over from when there was a command that took a value



##########
datafusion-cli/src/main.rs:
##########
@@ -174,27 +176,13 @@ async fn main_inner() -> Result<()> {
     let session_config = get_session_config(&args)?;
 
     let mut rt_builder = RuntimeEnvBuilder::new();
-    // set memory pool size
     if let Some(memory_limit) = args.memory_limit {
         // set memory pool type
         let pool: Arc<dyn MemoryPool> = match args.mem_pool_type {
-            PoolType::Fair if args.top_memory_consumers == 0 => {
-                Arc::new(FairSpillPool::new(memory_limit))
-            }
-            PoolType::Fair => Arc::new(TrackConsumersPool::new(
-                FairSpillPool::new(memory_limit),
-                NonZeroUsize::new(args.top_memory_consumers).unwrap(),
-            )),
-            PoolType::Greedy if args.top_memory_consumers == 0 => {
-                Arc::new(GreedyMemoryPool::new(memory_limit))
-            }
-            PoolType::Greedy => Arc::new(TrackConsumersPool::new(
-                GreedyMemoryPool::new(memory_limit),
-                NonZeroUsize::new(args.top_memory_consumers).unwrap(),
-            )),
+            PoolType::Fair => Arc::new(FairSpillPool::new(memory_limit)),

Review Comment:
   if we change this doesn't it mean that we will break the existing behavior 
of showing hte top consumers when memory profiling is not enabled?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to