alamb commented on code in PR #17021: URL: https://github.com/apache/datafusion/pull/17021#discussion_r2279058579
########## datafusion-cli/README.md: ########## @@ -30,3 +30,33 @@ DataFusion CLI (`datafusion-cli`) is a small command line utility that runs SQL ## Where can I find more information? See the [`datafusion-cli` documentation](https://datafusion.apache.org/user-guide/cli/index.html) for further information. + +## Memory Profiling + +> **Tip:** Memory profiling requires the tracked pool. Start the CLI with `--top-memory-consumers N` (N≥1), or profiling will report no metrics. By default, CLI starts with --top-memory-consumers 5. + +Enable memory tracking for the next query and display the report afterwards: + +```text +> \memory_profiling enable +Memory profiling enabled +> SELECT v % 100 AS group_key, COUNT(*) AS cnt, SUM(v) AS sum_v FROM generate_series(1,100000) AS t(v) GROUP BY group_key ORDER BY group_key; + ++-----------+------+----------+ +| group_key | cnt | sum_v | ++-----------+------+----------+ +| 0 | 1000 | 50050000 | +| 1 | 1000 | 49951000 | +| 2 | 1000 | 49952000 | +... + +\memory_profiling show Review Comment: Thanks @kosiew In terms of user experience, I feel like `\memory_profiling show` is redundant Specifically, I think if I as a user enabled memory profiling I would expect that all queries I ran would produce a memory report until it was disabled. So that would mean the report would always be shown ########## datafusion-cli/src/main.rs: ########## @@ -174,27 +176,15 @@ async fn main_inner() -> Result<()> { let session_config = get_session_config(&args)?; let mut rt_builder = RuntimeEnvBuilder::new(); - // set memory pool size + let mut base_pool: Option<Arc<dyn MemoryPool>> = None; Review Comment: Does this disable memory tracking by default? I was worried that that will be a regression when a query OOMs it won't show the top consumers anymore However, I tried it locally and it seems to still work: ```shell andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion$ cargo run -p datafusion-cli -- -m 10m warning: unused variable: `options` --> datafusion/datasource-parquet/src/reader.rs:247:9 | 247 | options: Option<&'a ArrowReaderOptions>, | ^^^^^^^ help: if this is intentional, prefix it with an underscore: `_options` | = note: `#[warn(unused_variables)]` on by default warning: `datafusion-datasource-parquet` (lib) generated 1 warning Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.14s Running `target/debug/datafusion-cli -m 10m` DataFusion CLI v49.0.0 > select distinct "ADDRESS1", "ADDRESS2" from '/Users/andrewlamb/Downloads/f94d0c87-8798-4bf6-9c98-8d89971e2539.parquet'; Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: GroupedHashAggregateStream[11] ()#27(can spill: true) consumed 4.7 MB, GroupedHashAggregateStream[0] ()#15(can spill: true) consumed 4.5 MB, GroupedHashAggregateStream[2] ()#18(can spill: true) consumed 0.0 B, GroupedHashAggregateStream[10] ()#26(can spill: true) consumed 0.0 B, RepartitionExec[5]#38(can spill: false) consumed 0.0 B. Error: Failed to allocate additional 1065.0 KB for RepartitionExec[0] with 0.0 B already allocated for this reservation - 844.8 KB remain available for the total pool ``` ########## datafusion-cli/src/cli_context.rs: ########## @@ -15,19 +15,69 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; +use std::num::NonZeroUsize; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, RwLock, +}; use datafusion::{ dataframe::DataFrame, error::DataFusionError, - execution::{context::SessionState, TaskContext}, + execution::{ + context::SessionState, + memory_pool::{ + MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, + TrackConsumersPool, TrackedPool, + }, + runtime_env::RuntimeEnvBuilder, + session_state::SessionStateBuilder, + TaskContext, + }, logical_expr::LogicalPlan, prelude::SessionContext, }; use object_store::ObjectStore; use crate::object_storage::{AwsOptions, GcpOptions}; +#[derive(Debug)] Review Comment: As I understand this code, I think it makes an `Arc<dyn MemoryPool>` implement the MemoryPool API directly However since we can already copy an `Arc` I don't undertand why this code is necessary so I played around with it. Here is an alternate suggestion that I think is more general -- basically automatically derive MemoryPool for any `Arc<dyn MemoryPool>`: - https://github.com/kosiew/datafusion/pull/30 ########## datafusion-cli/src/command.rs: ########## @@ -206,11 +243,31 @@ impl FromStr for Command { Self::OutputFormat(Some(subcommand.to_string())) } ("pset", None) => Self::OutputFormat(None), + ("memory_profiling", sub) => { Review Comment: What do you think about supporting `\memory_profling` without an argument that works like a toggle to enable/disable printing? ########## datafusion-cli/src/cli_context.rs: ########## @@ -93,6 +156,128 @@ impl CliSessionContext for SessionContext { &self, plan: LogicalPlan, ) -> Result<DataFrame, DataFusionError> { - self.execute_logical_plan(plan).await + SessionContext::execute_logical_plan(self, plan).await + } +} + +/// Session context used by the CLI with memory profiling support. +pub struct ReplSessionContext { + ctx: SessionContext, + memory_profiling: AtomicBool, + base_memory_pool: Arc<dyn MemoryPool>, + tracked_memory_pool: RwLock<Option<Arc<dyn TrackedPool>>>, + top_memory_consumers: usize, +} + +impl ReplSessionContext { + pub fn new( + ctx: SessionContext, + base_memory_pool: Arc<dyn MemoryPool>, + top_memory_consumers: usize, + ) -> Self { + Self { + ctx, + memory_profiling: AtomicBool::new(false), + base_memory_pool, + tracked_memory_pool: RwLock::new(None), + top_memory_consumers, + } + } +} + +#[async_trait::async_trait] +impl CliSessionContext for ReplSessionContext { + fn task_ctx(&self) -> Arc<TaskContext> { + self.ctx.task_ctx() + } + + fn session_state(&self) -> SessionState { + self.ctx.state() + } + + fn register_object_store( + &self, + url: &url::Url, + object_store: Arc<dyn ObjectStore>, + ) -> Option<Arc<dyn ObjectStore + 'static>> { + self.ctx.register_object_store(url, object_store) + } + + fn register_table_options_extension_from_scheme(&self, scheme: &str) { + match scheme { + // For Amazon S3 or Alibaba Cloud OSS + "s3" | "oss" | "cos" => self + .ctx + .register_table_options_extension(AwsOptions::default()), + // For Google Cloud Storage + "gs" | "gcs" => self + .ctx + .register_table_options_extension(GcpOptions::default()), + // For unsupported schemes, do nothing: + _ => {} + } + } + + async fn execute_logical_plan( + &self, + plan: LogicalPlan, + ) -> Result<DataFrame, DataFusionError> { + self.ctx.execute_logical_plan(plan).await + } + + fn memory_profiling(&self) -> bool { + self.memory_profiling.load(Ordering::Relaxed) + } + + fn set_memory_profiling(&self, enable: bool) { + if enable { + if self.top_memory_consumers == 0 { + return; + } + if self.memory_profiling.swap(true, Ordering::Relaxed) { + return; + } + let tracked = Arc::new(TrackConsumersPool::new( Review Comment: One thing I don't understand about this design is that the current datafusion-cli always uses a TrackConsumersPool, but this PR seems to install / uninstall a tracked pool bsed on the `memory_consumers` setting Would it be possible to *always* use the same TrackConsumersPool that is used today, and have `\memory_profiing` control when that information was displayed? I think the PR would get a lot simpler then -- it woudl be a matter of adding logic to display the TrackConsumersPool that was already present ########## datafusion-cli/src/cli_context.rs: ########## @@ -93,6 +156,128 @@ impl CliSessionContext for SessionContext { &self, plan: LogicalPlan, ) -> Result<DataFrame, DataFusionError> { - self.execute_logical_plan(plan).await + SessionContext::execute_logical_plan(self, plan).await + } +} + +/// Session context used by the CLI with memory profiling support. +pub struct ReplSessionContext { + ctx: SessionContext, + memory_profiling: AtomicBool, + base_memory_pool: Arc<dyn MemoryPool>, + tracked_memory_pool: RwLock<Option<Arc<dyn TrackedPool>>>, + top_memory_consumers: usize, +} + +impl ReplSessionContext { + pub fn new( + ctx: SessionContext, + base_memory_pool: Arc<dyn MemoryPool>, + top_memory_consumers: usize, + ) -> Self { + Self { + ctx, + memory_profiling: AtomicBool::new(false), + base_memory_pool, + tracked_memory_pool: RwLock::new(None), + top_memory_consumers, + } + } +} + +#[async_trait::async_trait] +impl CliSessionContext for ReplSessionContext { Review Comment: does this mean we can remove the impl for normal `SessionContext`? https://github.com/apache/datafusion/blob/e4e1cee82a734502ef79186e1bb16dddc237443a/datafusion-cli/src/cli_context.rs#L121-L120 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org