UBarney commented on code in PR #16814: URL: https://github.com/apache/datafusion/pull/16814#discussion_r2235510316
########## benchmarks/src/bin/mem_profile.rs: ########## @@ -0,0 +1,360 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! mem_profile binary entrypoint +use datafusion::error::Result; +use std::{ + env, + io::{BufRead, BufReader}, + path::Path, + process::{Command, Stdio}, +}; +use structopt::StructOpt; + +use datafusion_benchmarks::{ + clickbench, + h2o::{self, AllQueries}, + imdb, sort_tpch, tpch, +}; + +#[derive(Debug, StructOpt)] +#[structopt(name = "Memory Profiling Utility")] +struct MemProfileOpt { + /// Cargo profile to use in dfbench (e.g. release, release-nonlto) + #[structopt(long, default_value = "release")] + bench_profile: String, + + #[structopt(subcommand)] + command: Options, +} + +#[derive(Debug, StructOpt)] +#[structopt(about = "Benchmark command")] +enum Options { + Clickbench(clickbench::RunOpt), + H2o(h2o::RunOpt), + Imdb(imdb::RunOpt), + SortTpch(sort_tpch::RunOpt), + Tpch(tpch::RunOpt), +} + +#[tokio::main] +pub async fn main() -> Result<()> { + // 1. Parse args and check which benchmarks should be run + let mem_profile_opt = MemProfileOpt::from_args(); + let profile = mem_profile_opt.bench_profile; + let query_range = match mem_profile_opt.command { + Options::Clickbench(opt) => { + let entries = std::fs::read_dir(&opt.queries_path)? + .filter_map(Result::ok) + .filter(|e| { + let path = e.path(); + path.extension().map(|ext| ext == "sql").unwrap_or(false) + }) + .collect::<Vec<_>>(); + + let max_query_id = entries.len().saturating_sub(1); + match opt.query { + Some(query_id) => query_id..=query_id, + None => 0..=max_query_id, + } + } + Options::H2o(opt) => { + let queries = AllQueries::try_new(&opt.queries_path)?; + match opt.query { + Some(query_id) => query_id..=query_id, + None => queries.min_query_id()..=queries.max_query_id(), + } + } + Options::Imdb(opt) => match opt.query { + Some(query_id) => query_id..=query_id, + None => imdb::IMDB_QUERY_START_ID..=imdb::IMDB_QUERY_END_ID, + }, + Options::SortTpch(opt) => match opt.query { + Some(query_id) => query_id..=query_id, + None => { + sort_tpch::SORT_TPCH_QUERY_START_ID..=sort_tpch::SORT_TPCH_QUERY_END_ID + } + }, + Options::Tpch(opt) => match opt.query { + Some(query_id) => query_id..=query_id, + None => tpch::TPCH_QUERY_START_ID..=tpch::TPCH_QUERY_END_ID, + }, + }; + + // 2. Prebuild dfbench binary so that memory does not blow up due to build process + println!("Pre-building benchmark binary..."); + let status = Command::new("cargo") + .args([ + "build", + "--profile", + &profile, + "--features", + "mimalloc_extended", + "--bin", + "dfbench", + ]) + .status() + .expect("Failed to build dfbench"); + assert!(status.success()); + println!("Benchmark binary built successfully."); + + // 3. Create a new process per each benchmark query and print summary + // Find position of subcommand to collect args for dfbench + let args: Vec<_> = env::args().collect(); + let subcommands = ["tpch", "clickbench", "h2o", "imdb", "sort-tpch"]; + let sub_pos = args + .iter() + .position(|s| subcommands.iter().any(|&cmd| s == cmd)) + .expect("No benchmark subcommand found"); + + // Args starting from subcommand become dfbench args + let mut dfbench_args: Vec<String> = + args[sub_pos..].iter().map(|s| s.to_string()).collect(); + + run_benchmark_as_child_process(&profile, query_range, &mut dfbench_args)?; + + Ok(()) +} + +fn run_benchmark_as_child_process( + profile: &str, + query_range: std::ops::RangeInclusive<usize>, + args: &mut Vec<String>, +) -> Result<()> { + let mut query_strings: Vec<String> = Vec::new(); + for i in query_range { + query_strings.push(i.to_string()); + } + + let target_dir = + env::var("CARGO_TARGET_DIR").unwrap_or_else(|_| "target".to_string()); + let command = format!("{target_dir}/{profile}/dfbench"); + // Check whether benchmark binary exists + if !Path::new(&command).exists() { + panic!( + "Benchmark binary not found: `{command}`\nRun this command from the top-level `datafusion/` directory so `target/{profile}/dfbench` can be found.", + ); + } + args.insert(0, command); + let mut results = vec![]; + + // Run Single Query (args already contain --query num) + if args.contains(&"--query".to_string()) { + let _ = run_query(args, &mut results); + print_summary_table(&results); + return Ok(()); + } + + // Run All Queries + args.push("--query".to_string()); + for query_str in query_strings { + args.push(query_str); + let _ = run_query(args, &mut results); + args.pop(); + } + + print_summary_table(&results); + Ok(()) +} + +fn run_query(args: &[String], results: &mut Vec<QueryResult>) -> Result<()> { + let exec_path = &args[0]; + let exec_args = &args[1..]; + + let mut child = Command::new(exec_path) + .args(exec_args) + .stdout(Stdio::piped()) + .spawn() + .expect("Failed to start benchmark"); + + let stdout = child.stdout.take().unwrap(); + let reader = BufReader::new(stdout); + + // Buffer child's stdout + let lines: Result<Vec<String>, std::io::Error> = + reader.lines().collect::<Result<_, _>>(); + + child + .wait() + .expect("Benchmark process exited with an error"); + + // Parse after child process terminates + let lines = lines?; + let mut iter = lines.iter().peekable(); + + // Look for lines that contain execution time / memory stats + while let Some(line) = iter.next() { + if let Some((query, duration_ms)) = parse_query_time(line) { + if let Some(next_line) = iter.peek() { + if let Some((peak_rss, peak_commit, page_faults)) = + parse_vm_line(next_line) + { + results.push(QueryResult { + query, + duration_ms, + peak_rss, + peak_commit, + page_faults, + }); + break; + } + } + } + } + + Ok(()) +} + +#[derive(Debug)] +struct QueryResult { + query: usize, + duration_ms: f64, + peak_rss: String, + peak_commit: String, + page_faults: String, +} + +fn parse_query_time(line: &str) -> Option<(usize, f64)> { + let re = regex::Regex::new(r"Query (\d+) avg time: ([\d.]+) ms").unwrap(); + if let Some(caps) = re.captures(line) { + let query_id = caps[1].parse::<usize>().ok()?; + let avg_time = caps[2].parse::<f64>().ok()?; + Some((query_id, avg_time)) + } else { + None + } +} + +fn parse_vm_line(line: &str) -> Option<(String, String, String)> { + let re = regex::Regex::new( + r"Peak RSS:\s*([\d.]+\s*[A-Z]+),\s*Peak Commit:\s*([\d.]+\s*[A-Z]+),\s*Page Faults:\s*([\d.]+)" + ).ok()?; + let caps = re.captures(line)?; + let peak_rss = caps.get(1)?.as_str().to_string(); + let peak_commit = caps.get(2)?.as_str().to_string(); + let page_faults = caps.get(3)?.as_str().to_string(); + Some((peak_rss, peak_commit, page_faults)) +} + +// Print as simple aligned table +fn print_summary_table(results: &[QueryResult]) { + println!( + "\n{:<8} {:>10} {:>12} {:>12} {:>12}", + "Query", "Time (ms)", "Peak RSS", "Peak Commit", "Page Faults" Review Comment: Does "page fault" here refer to major page faults? It might be clearer if we specify it as a "major page fault". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org