Weijun-H commented on PR #14411: URL: https://github.com/apache/datafusion/pull/14411#issuecomment-2646145904
> Impressive work! I got a suggestion and a high-level question: > > ### Suggestion > I think to justify this change, we have to make sure: > > * No performance regression (benchmarks already showed) > * Reduce memory footprint, for queries which batch can accumulate in `RepartitionExec` (as the origin issue said) > > I tried to check the memory usage for `tpch-sf10` and `clickbench`, there is no noticeable change for those queries. Perhaps we should construct queries with this anti-pattern, and demonstrate memory usage can actually reduced by this on-demand repartition executor? > > Here is a script for checking memory usage in benchmark queries > > ```python > # This script should be placed under benchmarks/ > # > # Supported benchmarks are 'tpch' and 'clickbench' > # > # Example usage: > # Run TPCH benchmark and save results: > # python3 membench.py run --benchmark tpch --result tpch_main.csv > # python3 membench.py run --benchmark tpch --result tpch_optimized.csv > # > # Compare results: > # python3 membench.py compare tpch_main.csv tpch_optimized.csv > > import subprocess > import re > import csv > import argparse > > def human_readable_size(size): > units = ["B", "K", "M", "G", "T"] > index = 0 > while size >= 1024 and index < len(units) - 1: > size /= 1024.0 > index += 1 > return f"{size:.2f}{units[index]}" > > def run_tpch_queries(label, result_file): > results = [] > for query in range(1, 23): > cmd = [ > "/usr/bin/time", "-l", "cargo", "run", "--release", "--bin", "dfbench", > "tpch", "--format", "parquet", "--path", "./data/tpch_sf10", > "--query", str(query), "--iterations", "1" > ] > > process = subprocess.run(cmd, capture_output=True, text=True, shell=False) > stderr_output = process.stderr > > match = re.search(r"(\d+)\s+maximum resident set size", stderr_output) > max_rss = human_readable_size(int(match.group(1))) if match else "N/A" > results.append((query, max_rss)) > > with open(result_file, "w", newline='') as f: > writer = csv.writer(f) > writer.writerow(["Query", "Memory"]) > writer.writerows(results) > > print(f"Results saved to {result_file}") > > def run_clickbench_queries(label, result_file): > results = [] > for query in range(0, 43): > cmd = [ > "/usr/bin/time", "-l", "cargo", "run", "--release", "--bin", "dfbench", > "clickbench", "--path", "./data/hits.parquet", > "--queries-path", "./queries/clickbench/queries.sql", > "--query", str(query), "--iterations", "1" > ] > > process = subprocess.run(cmd, capture_output=True, text=True, shell=False) > stderr_output = process.stderr > > match = re.search(r"(\d+)\s+maximum resident set size", stderr_output) > max_rss = human_readable_size(int(match.group(1))) if match else "N/A" > results.append((query, max_rss)) > > with open(result_file, "w", newline='') as f: > writer = csv.writer(f) > writer.writerow(["Query", "Memory"]) > writer.writerows(results) > > print(f"Results saved to {result_file}") > > def compare_results(file1, file2): > results1, results2 = {}, {} > > with open(file1, "r") as f1, open(file2, "r") as f2: > reader1, reader2 = csv.reader(f1), csv.reader(f2) > next(reader1) # Skip header > next(reader2) # Skip header > > for row in reader1: > results1[row[0]] = row[1] > for row in reader2: > results2[row[0]] = row[1] > > print(f"{'Query':<10}{'Branch1':<10}{'Branch2':<10}{'Change'}") > for query in results1: > mem1 = results1[query] > mem2 = results2.get(query, "N/A") > > if mem1 != "N/A" and mem2 != "N/A": > size1 = float(mem1[:-1]) > size2 = float(mem2[:-1]) > ratio = size2 / size1 if size1 > 0 else 1.0 > change = f"{ratio:.2f}X" if abs(ratio - 1) > 0.05 else "No Change" > else: > change = "N/A" > > print(f"{query:<10}{mem1:<10}{mem2:<10}{change}") > > def main(): > parser = argparse.ArgumentParser() > parser.add_argument("mode", choices=["run", "compare"], help="Run benchmarks or compare results") > parser.add_argument("--result", help="Output result file for benchmarking") > parser.add_argument("--benchmark", choices=["tpch", "clickbench"], help="Specify which benchmark to run") > parser.add_argument("file1", nargs="?", help="First result file for comparison") > parser.add_argument("file2", nargs="?", help="Second result file for comparison") > args = parser.parse_args() > > if args.mode == "run" and args.result and args.benchmark: > if args.benchmark == "tpch": > run_tpch_queries("run", args.result) > elif args.benchmark == "clickbench": > run_clickbench_queries("run", args.result) > elif args.mode == "compare" and args.file1 and args.file2: > compare_results(args.file1, args.file2) > else: > print("Invalid arguments. Use --help for usage information.") > > if __name__ == "__main__": > main() > ``` > > Results: > > ``` > TPCH: > ---- > Query Branch1 Branch2 Change > 1 464.05M 460.78M No Change > 2 397.00M 412.77M No Change > 3 714.56M 630.64M 0.88X > 4 408.53M 418.78M No Change > 5 741.30M 769.73M No Change > 6 390.02M 398.72M No Change > 7 3.41G 3.45G No Change > 8 1.08G 1.05G No Change > 9 2.37G 2.31G No Change > 10 1.11G 1.16G No Change > 11 260.78M 267.41M No Change > 12 429.95M 449.06M No Change > 13 675.67M 668.22M No Change > 14 666.56M 700.22M No Change > 15 673.66M 656.70M No Change > 16 485.81M 474.59M No Change > 17 605.38M 631.92M No Change > 18 3.26G 3.29G No Change > 19 500.77M 577.95M 1.15X > 20 1.07G 1.05G No Change > 21 982.59M 978.69M No Change > 22 303.86M 302.14M No Change > > Clickbench: > ...(no change) > ``` > > ### Question > In my understanding the new repartition executor is a wrapper on `RepartitionExec`, to enable lazy evaluation, it should support both `RoundRobin` and `Hash` repartition right? This PR only swapped `RoundRobin`, do you also plan to add on-demand hash repartition in the future? Thank you for your exploration! Did you set `prefer_round_robin` to false to enable `OnDemandRepartition`? @2010YOUY01 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org