2010YOUY01 commented on PR #14411:
URL: https://github.com/apache/datafusion/pull/14411#issuecomment-2646119135
Impressive work! I got a suggestion and a high-level question:
### Suggestion
I think to justify this change, we have to make sure:
- No performance regression (benchmarks already showed)
- Reduce memory footprint, for queries which batch can accumulate in
`RepartitionExec` (as the origin issue said)
I tried to check the memory usage for `tpch-sf10` and `clickbench`, there is
no noticeable change for those queries.
Perhaps we should construct queries with this anti-pattern, and demonstrate
memory usage can actually reduced by this on-demand repartition executor?
Here is a script for checking memory usage in benchmark queries
<details>
```python
# This script should be placed under benchmarks/
#
# Supported benchmarks are 'tpch' and 'clickbench'
#
# Example usage:
# Run TPCH benchmark and save results:
# python3 membench.py run --benchmark tpch --result tpch_main.csv
# python3 membench.py run --benchmark tpch --result tpch_optimized.csv
#
# Compare results:
# python3 membench.py compare tpch_main.csv tpch_optimized.csv
import subprocess
import re
import csv
import argparse
def human_readable_size(size):
units = ["B", "K", "M", "G", "T"]
index = 0
while size >= 1024 and index < len(units) - 1:
size /= 1024.0
index += 1
return f"{size:.2f}{units[index]}"
def run_tpch_queries(label, result_file):
results = []
for query in range(1, 23):
cmd = [
"/usr/bin/time", "-l", "cargo", "run", "--release", "--bin",
"dfbench",
"tpch", "--format", "parquet", "--path", "./data/tpch_sf10",
"--query", str(query), "--iterations", "1"
]
process = subprocess.run(cmd, capture_output=True, text=True,
shell=False)
stderr_output = process.stderr
match = re.search(r"(\d+)\s+maximum resident set size",
stderr_output)
max_rss = human_readable_size(int(match.group(1))) if match else
"N/A"
results.append((query, max_rss))
with open(result_file, "w", newline='') as f:
writer = csv.writer(f)
writer.writerow(["Query", "Memory"])
writer.writerows(results)
print(f"Results saved to {result_file}")
def run_clickbench_queries(label, result_file):
results = []
for query in range(0, 43):
cmd = [
"/usr/bin/time", "-l", "cargo", "run", "--release", "--bin",
"dfbench",
"clickbench", "--path", "./data/hits.parquet",
"--queries-path", "./queries/clickbench/queries.sql",
"--query", str(query), "--iterations", "1"
]
process = subprocess.run(cmd, capture_output=True, text=True,
shell=False)
stderr_output = process.stderr
match = re.search(r"(\d+)\s+maximum resident set size",
stderr_output)
max_rss = human_readable_size(int(match.group(1))) if match else
"N/A"
results.append((query, max_rss))
with open(result_file, "w", newline='') as f:
writer = csv.writer(f)
writer.writerow(["Query", "Memory"])
writer.writerows(results)
print(f"Results saved to {result_file}")
def compare_results(file1, file2):
results1, results2 = {}, {}
with open(file1, "r") as f1, open(file2, "r") as f2:
reader1, reader2 = csv.reader(f1), csv.reader(f2)
next(reader1) # Skip header
next(reader2) # Skip header
for row in reader1:
results1[row[0]] = row[1]
for row in reader2:
results2[row[0]] = row[1]
print(f"{'Query':<10}{'Branch1':<10}{'Branch2':<10}{'Change'}")
for query in results1:
mem1 = results1[query]
mem2 = results2.get(query, "N/A")
if mem1 != "N/A" and mem2 != "N/A":
size1 = float(mem1[:-1])
size2 = float(mem2[:-1])
ratio = size2 / size1 if size1 > 0 else 1.0
change = f"{ratio:.2f}X" if abs(ratio - 1) > 0.05 else "No
Change"
else:
change = "N/A"
print(f"{query:<10}{mem1:<10}{mem2:<10}{change}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("mode", choices=["run", "compare"], help="Run
benchmarks or compare results")
parser.add_argument("--result", help="Output result file for
benchmarking")
parser.add_argument("--benchmark", choices=["tpch", "clickbench"],
help="Specify which benchmark to run")
parser.add_argument("file1", nargs="?", help="First result file for
comparison")
parser.add_argument("file2", nargs="?", help="Second result file for
comparison")
args = parser.parse_args()
if args.mode == "run" and args.result and args.benchmark:
if args.benchmark == "tpch":
run_tpch_queries("run", args.result)
elif args.benchmark == "clickbench":
run_clickbench_queries("run", args.result)
elif args.mode == "compare" and args.file1 and args.file2:
compare_results(args.file1, args.file2)
else:
print("Invalid arguments. Use --help for usage information.")
if __name__ == "__main__":
main()
```
</details>
Results:
```
TPCH:
----
Query Branch1 Branch2 Change
1 464.05M 460.78M No Change
2 397.00M 412.77M No Change
3 714.56M 630.64M 0.88X
4 408.53M 418.78M No Change
5 741.30M 769.73M No Change
6 390.02M 398.72M No Change
7 3.41G 3.45G No Change
8 1.08G 1.05G No Change
9 2.37G 2.31G No Change
10 1.11G 1.16G No Change
11 260.78M 267.41M No Change
12 429.95M 449.06M No Change
13 675.67M 668.22M No Change
14 666.56M 700.22M No Change
15 673.66M 656.70M No Change
16 485.81M 474.59M No Change
17 605.38M 631.92M No Change
18 3.26G 3.29G No Change
19 500.77M 577.95M 1.15X
20 1.07G 1.05G No Change
21 982.59M 978.69M No Change
22 303.86M 302.14M No Change
Clickbench:
...(no change)
```
### Question
In my understanding the new repartition executor is a wrapper on
`RepartitionExec`, to enable lazy evaluation, it should support both
`RoundRobin` and `Hash` repartition right? This PR only swapped `RoundRobin`,
do you also plan to add on-demand hash repartition in the future?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]