Weijun-H commented on PR #14411:
URL: https://github.com/apache/datafusion/pull/14411#issuecomment-2646145904

   > Impressive work! I got a suggestion and a high-level question:
   > 
   > ### Suggestion
   > I think to justify this change, we have to make sure:
   > 
   > * No performance regression (benchmarks already showed)
   > * Reduce memory footprint, for queries which batch can accumulate in 
`RepartitionExec` (as the origin issue said)
   > 
   > I tried to check the memory usage for `tpch-sf10` and `clickbench`, there 
is no noticeable change for those queries. Perhaps we should construct queries 
with this anti-pattern, and demonstrate memory usage can actually reduced by 
this on-demand repartition executor?
   > 
   > Here is a script for checking memory usage in benchmark queries
   > 
   > ```python
   > # This script should be placed under benchmarks/
   > #
   > # Supported benchmarks are 'tpch' and 'clickbench'
   > #
   > # Example usage:
   > # Run TPCH benchmark and save results:
   > #   python3 membench.py run --benchmark tpch --result tpch_main.csv
   > #   python3 membench.py run --benchmark tpch --result tpch_optimized.csv
   > #
   > # Compare results:
   > #   python3 membench.py compare tpch_main.csv tpch_optimized.csv
   > 
   > import subprocess
   > import re
   > import csv
   > import argparse
   > 
   > def human_readable_size(size):
   >     units = ["B", "K", "M", "G", "T"]
   >     index = 0
   >     while size >= 1024 and index < len(units) - 1:
   >         size /= 1024.0
   >         index += 1
   >     return f"{size:.2f}{units[index]}"
   > 
   > def run_tpch_queries(label, result_file):
   >     results = []
   >     for query in range(1, 23):
   >         cmd = [
   >             "/usr/bin/time", "-l", "cargo", "run", "--release", "--bin", 
"dfbench", 
   >             "tpch", "--format", "parquet", "--path", "./data/tpch_sf10", 
   >             "--query", str(query), "--iterations", "1"
   >         ]
   >         
   >         process = subprocess.run(cmd, capture_output=True, text=True, 
shell=False)
   >         stderr_output = process.stderr
   >         
   >         match = re.search(r"(\d+)\s+maximum resident set size", 
stderr_output)
   >         max_rss = human_readable_size(int(match.group(1))) if match else 
"N/A"
   >         results.append((query, max_rss))
   >     
   >     with open(result_file, "w", newline='') as f:
   >         writer = csv.writer(f)
   >         writer.writerow(["Query", "Memory"])
   >         writer.writerows(results)
   >     
   >     print(f"Results saved to {result_file}")
   > 
   > def run_clickbench_queries(label, result_file):
   >     results = []
   >     for query in range(0, 43):
   >         cmd = [
   >             "/usr/bin/time", "-l", "cargo", "run", "--release", "--bin", 
"dfbench", 
   >             "clickbench", "--path", "./data/hits.parquet", 
   >             "--queries-path", "./queries/clickbench/queries.sql", 
   >             "--query", str(query), "--iterations", "1"
   >         ]
   >         
   >         process = subprocess.run(cmd, capture_output=True, text=True, 
shell=False)
   >         stderr_output = process.stderr
   >         
   >         match = re.search(r"(\d+)\s+maximum resident set size", 
stderr_output)
   >         max_rss = human_readable_size(int(match.group(1))) if match else 
"N/A"
   >         results.append((query, max_rss))
   >     
   >     with open(result_file, "w", newline='') as f:
   >         writer = csv.writer(f)
   >         writer.writerow(["Query", "Memory"])
   >         writer.writerows(results)
   >     
   >     print(f"Results saved to {result_file}")
   > 
   > def compare_results(file1, file2):
   >     results1, results2 = {}, {}
   >     
   >     with open(file1, "r") as f1, open(file2, "r") as f2:
   >         reader1, reader2 = csv.reader(f1), csv.reader(f2)
   >         next(reader1)  # Skip header
   >         next(reader2)  # Skip header
   >         
   >         for row in reader1:
   >             results1[row[0]] = row[1]
   >         for row in reader2:
   >             results2[row[0]] = row[1]
   >     
   >     print(f"{'Query':<10}{'Branch1':<10}{'Branch2':<10}{'Change'}")
   >     for query in results1:
   >         mem1 = results1[query]
   >         mem2 = results2.get(query, "N/A")
   >         
   >         if mem1 != "N/A" and mem2 != "N/A":
   >             size1 = float(mem1[:-1])
   >             size2 = float(mem2[:-1])
   >             ratio = size2 / size1 if size1 > 0 else 1.0
   >             change = f"{ratio:.2f}X" if abs(ratio - 1) > 0.05 else "No 
Change"
   >         else:
   >             change = "N/A"
   >         
   >         print(f"{query:<10}{mem1:<10}{mem2:<10}{change}")
   > 
   > def main():
   >     parser = argparse.ArgumentParser()
   >     parser.add_argument("mode", choices=["run", "compare"], help="Run 
benchmarks or compare results")
   >     parser.add_argument("--result", help="Output result file for 
benchmarking")
   >     parser.add_argument("--benchmark", choices=["tpch", "clickbench"], 
help="Specify which benchmark to run")
   >     parser.add_argument("file1", nargs="?", help="First result file for 
comparison")
   >     parser.add_argument("file2", nargs="?", help="Second result file for 
comparison")
   >     args = parser.parse_args()
   >     
   >     if args.mode == "run" and args.result and args.benchmark:
   >         if args.benchmark == "tpch":
   >             run_tpch_queries("run", args.result)
   >         elif args.benchmark == "clickbench":
   >             run_clickbench_queries("run", args.result)
   >     elif args.mode == "compare" and args.file1 and args.file2:
   >         compare_results(args.file1, args.file2)
   >     else:
   >         print("Invalid arguments. Use --help for usage information.")
   > 
   > if __name__ == "__main__":
   >     main()
   > ```
   > 
   > Results:
   > 
   > ```
   > TPCH:
   > ----
   > Query     Branch1   Branch2   Change
   > 1         464.05M   460.78M   No Change
   > 2         397.00M   412.77M   No Change
   > 3         714.56M   630.64M   0.88X
   > 4         408.53M   418.78M   No Change
   > 5         741.30M   769.73M   No Change
   > 6         390.02M   398.72M   No Change
   > 7         3.41G     3.45G     No Change
   > 8         1.08G     1.05G     No Change
   > 9         2.37G     2.31G     No Change
   > 10        1.11G     1.16G     No Change
   > 11        260.78M   267.41M   No Change
   > 12        429.95M   449.06M   No Change
   > 13        675.67M   668.22M   No Change
   > 14        666.56M   700.22M   No Change
   > 15        673.66M   656.70M   No Change
   > 16        485.81M   474.59M   No Change
   > 17        605.38M   631.92M   No Change
   > 18        3.26G     3.29G     No Change
   > 19        500.77M   577.95M   1.15X
   > 20        1.07G     1.05G     No Change
   > 21        982.59M   978.69M   No Change
   > 22        303.86M   302.14M   No Change
   > 
   > Clickbench:
   > ...(no change)
   > ```
   > 
   > ### Question
   > In my understanding the new repartition executor is a wrapper on 
`RepartitionExec`, to enable lazy evaluation, it should support both 
`RoundRobin` and `Hash` repartition right? This PR only swapped `RoundRobin`, 
do you also plan to add on-demand hash repartition in the future?
   
   
   Thank you for your exploration! Did you set `prefer_round_robin` to false to 
enable `OnDemandRepartition`? @2010YOUY01 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to