[
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074641#comment-17074641
]
Maximilian Michels commented on BEAM-8944:
------------------------------------------
Thank you for your thoughts Luke. I agree that the best approach is to work on
a solution based on profiling the current code.
Here is my attempt to write a benchmark which most closely resembles our setup.
There are at most two active bundles per SDK harness (remember, we use 16 of
them and distribute bundles round-robin), hence two active tasks. This is
adjustable in the {{run_benchmark}} method:
{code:python}
import time
from concurrent import futures
from future.moves import queue
from utils.thread_pool_executor import UnboundedThreadPoolExecutor
import cProfile as profiler
def run_benchmark(executor,
max_concurrent_threads=2,
total_iterations=10000,
warmup_iterations=100):
q = queue.Queue()
for i in range(max_concurrent_threads):
q.put(i)
def task(number):
q.put(task)
return number
count = 0
start_time = None
profile = profiler.Profile()
while count < total_iterations:
if count == warmup_iterations:
start_time = time.time()
profile.enable()
executor.submit(task, q.get(block=True))
count += 1
profile.disable()
print('%s uses %s' % (executor, time.time() - start_time))
profile.print_stats(sort='time')
if __name__ == '__main__':
with UnboundedThreadPoolExecutor() as executor:
run_benchmark(executor)
with futures.ThreadPoolExecutor(max_workers=1) as executor:
run_benchmark(executor)
with futures.ThreadPoolExecutor(max_workers=12) as executor:
run_benchmark(executor)
{code}
The results clearly reveal the cost of the current implementation:
{noformat}
<utils.thread_pool_executor.UnboundedThreadPoolExecutor object at 0x10da1f750>
uses 7.32124900818
575749 function calls in 7.302 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
83878 6.701 0.000 6.701 0.000 {method 'acquire' of
'thread.lock' objects}
19800 0.087 0.000 6.958 0.000 Queue.py:150(get)
8595 0.064 0.000 6.662 0.001 threading.py:309(wait)
29698 0.062 0.000 0.157 0.000 threading.py:373(notify)
9903 0.045 0.000 0.086 0.000 threading.py:260(__init__)
9900 0.043 0.000 0.481 0.000
thread_pool_executor.py:134(submit)
9899 0.040 0.000 0.176 0.000
thread_pool_executor.py:103(accepted_work)
38293 0.028 0.000 0.099 0.000 threading.py:300(_is_owned)
9899 0.025 0.000 0.136 0.000 threading.py:576(set)
9900 0.020 0.000 0.020 0.000 {method '__enter__' of
'thread.lock' objects}
9900 0.020 0.000 0.117 0.000 _base.py:318(__init__)
9900 0.014 0.000 0.024 0.000 threading.py:132(__init__)
38294 0.013 0.000 0.013 0.000 threading.py:64(_note)
28394 0.013 0.000 0.018 0.000 Queue.py:200(_qsize)
19806 0.012 0.000 0.012 0.000 threading.py:59(__init__)
19799 0.012 0.000 0.014 0.000 Queue.py:208(_get)
9899 0.011 0.000 0.076 0.000 threading.py:400(notifyAll)
9903 0.011 0.000 0.098 0.000 threading.py:242(Condition)
8595 0.010 0.000 0.053 0.000 threading.py:297(_acquire_restore)
18499 0.009 0.000 0.009 0.000 {thread.allocate_lock}
9900 0.009 0.000 0.033 0.000 threading.py:114(RLock)
38294 0.009 0.000 0.009 0.000 {method 'release' of
'thread.lock' objects}
9900 0.007 0.000 0.007 0.000
thread_pool_executor.py:34(__init__)
9900 0.007 0.000 0.026 0.000 threading.py:285(__enter__)
38293 0.007 0.000 0.007 0.000 {len}
9900 0.007 0.000 0.008 0.000 threading.py:288(__exit__)
9899 0.004 0.000 0.004 0.000 {method 'remove' of 'list'
objects}
8595 0.004 0.000 0.006 0.000 threading.py:294(_release_save)
8595 0.003 0.000 0.003 0.000 {method 'append' of 'list'
objects}
19799 0.003 0.000 0.003 0.000 {method 'popleft' of
'collections.deque' objects}
9900 0.002 0.000 0.002 0.000 {method '__exit__' of
'thread.lock' objects}
1 0.000 0.000 0.000 0.000 {thread.start_new_thread}
1 0.000 0.000 0.000 0.000 threading.py:647(__init__)
1 0.000 0.000 0.000 0.000 threading.py:717(start)
2 0.000 0.000 0.000 0.000 threading.py:561(__init__)
1 0.000 0.000 0.000 0.000 threading.py:620(_newname)
1 0.000 0.000 0.000 0.000 threading.py:597(wait)
1 0.000 0.000 0.000 0.000 _weakrefset.py:83(add)
1 0.000 0.000 0.000 0.000
thread_pool_executor.py:58(__init__)
1 0.000 0.000 0.000 0.000 threading.py:700(_set_daemon)
1 0.000 0.000 0.000 0.000 threading.py:1143(currentThread)
1 0.000 0.000 0.000 0.000 threading.py:1015(daemon)
2 0.000 0.000 0.000 0.000 threading.py:542(Event)
1 0.000 0.000 0.000 0.000 {thread.get_ident}
1 0.000 0.000 0.000 0.000 {method 'add' of 'set' objects}
2 0.000 0.000 0.000 0.000 threading.py:570(isSet)
1 0.000 0.000 0.000 0.000 threading.py:999(daemon)
1 0.000 0.000 0.000 0.000 {method 'disable' of
'_lsprof.Profiler' objects}
<concurrent.futures.thread.ThreadPoolExecutor object at 0x10da12110> uses
0.855124950409
419751 function calls in 0.840 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
53064 0.452 0.000 0.452 0.000 {method 'acquire' of
'thread.lock' objects}
9900 0.063 0.000 0.063 0.000 {method '__enter__' of
'thread.lock' objects}
9900 0.047 0.000 0.385 0.000 thread.py:128(submit)
9900 0.031 0.000 0.455 0.000 Queue.py:150(get)
9900 0.028 0.000 0.054 0.000 threading.py:260(__init__)
19800 0.024 0.000 0.072 0.000 threading.py:373(notify)
9900 0.024 0.000 0.103 0.000 threading.py:440(acquire)
9900 0.023 0.000 0.145 0.000 Queue.py:107(put)
3366 0.016 0.000 0.314 0.000 threading.py:309(wait)
26329 0.015 0.000 0.015 0.000 {method 'release' of
'thread.lock' objects}
9900 0.015 0.000 0.078 0.000 _base.py:318(__init__)
23166 0.012 0.000 0.045 0.000 threading.py:300(_is_owned)
9900 0.010 0.000 0.016 0.000 threading.py:132(__init__)
9900 0.009 0.000 0.063 0.000 threading.py:242(Condition)
9900 0.007 0.000 0.111 0.000
thread.py:141(_adjust_thread_count)
19800 0.007 0.000 0.007 0.000 threading.py:59(__init__)
9900 0.006 0.000 0.022 0.000 threading.py:114(RLock)
9900 0.006 0.000 0.069 0.000 threading.py:285(__enter__)
33066 0.006 0.000 0.006 0.000 threading.py:64(_note)
9900 0.005 0.000 0.005 0.000 thread.py:52(__init__)
9900 0.005 0.000 0.008 0.000 threading.py:288(__exit__)
9900 0.005 0.000 0.006 0.000 Queue.py:204(_put)
13266 0.004 0.000 0.006 0.000 Queue.py:200(_qsize)
9900 0.004 0.000 0.005 0.000 Queue.py:208(_get)
13266 0.004 0.000 0.004 0.000 {thread.allocate_lock}
9900 0.003 0.000 0.003 0.000 {method '__exit__' of
'thread.lock' objects}
3366 0.002 0.000 0.007 0.000 threading.py:297(_acquire_restore)
3366 0.002 0.000 0.002 0.000 threading.py:294(_release_save)
13266 0.002 0.000 0.002 0.000 {len}
3163 0.001 0.000 0.001 0.000 {method 'remove' of 'list'
objects}
9900 0.001 0.000 0.001 0.000 {method 'append' of
'collections.deque' objects}
3366 0.001 0.000 0.001 0.000 {method 'append' of 'list'
objects}
9900 0.001 0.000 0.001 0.000 {method 'popleft' of
'collections.deque' objects}
1 0.000 0.000 0.000 0.000 {method 'disable' of
'_lsprof.Profiler' objects}
<concurrent.futures.thread.ThreadPoolExecutor object at 0x10da18e50> uses
1.18780708313
409313 function calls in 1.168 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
46328 0.518 0.000 0.518 0.000 {method 'acquire' of
'thread.lock' objects}
9900 0.103 0.000 0.728 0.000 thread.py:128(submit)
9900 0.095 0.000 0.095 0.000 {method '__enter__' of
'thread.lock' objects}
9900 0.067 0.000 0.187 0.000 threading.py:440(acquire)
19800 0.054 0.000 0.135 0.000 threading.py:373(notify)
9900 0.052 0.000 0.440 0.000 Queue.py:150(get)
9900 0.038 0.000 0.328 0.000 Queue.py:107(put)
31214 0.035 0.000 0.035 0.000 {method 'release' of
'thread.lock' objects}
9900 0.033 0.000 0.061 0.000 threading.py:260(__init__)
9900 0.020 0.000 0.090 0.000 _base.py:318(__init__)
21482 0.017 0.000 0.062 0.000 threading.py:300(_is_owned)
9900 0.015 0.000 0.202 0.000
thread.py:141(_adjust_thread_count)
1682 0.015 0.000 0.184 0.000 threading.py:309(wait)
9900 0.011 0.000 0.106 0.000 threading.py:285(__enter__)
9900 0.011 0.000 0.017 0.000 threading.py:132(__init__)
31382 0.010 0.000 0.010 0.000 threading.py:64(_note)
11582 0.010 0.000 0.011 0.000 Queue.py:200(_qsize)
9900 0.009 0.000 0.070 0.000 threading.py:242(Condition)
19800 0.008 0.000 0.008 0.000 threading.py:59(__init__)
9900 0.007 0.000 0.024 0.000 threading.py:114(RLock)
9900 0.006 0.000 0.007 0.000 Queue.py:204(_put)
9900 0.005 0.000 0.005 0.000 thread.py:52(__init__)
9900 0.005 0.000 0.010 0.000 threading.py:288(__exit__)
9900 0.005 0.000 0.006 0.000 Queue.py:208(_get)
9900 0.004 0.000 0.004 0.000 {method '__exit__' of
'thread.lock' objects}
11582 0.004 0.000 0.004 0.000 {thread.allocate_lock}
9732 0.004 0.000 0.004 0.000 {method 'remove' of 'list'
objects}
1682 0.002 0.000 0.003 0.000 threading.py:294(_release_save)
11582 0.002 0.000 0.002 0.000 {len}
1682 0.002 0.000 0.033 0.000 threading.py:297(_acquire_restore)
9900 0.001 0.000 0.001 0.000 {method 'append' of
'collections.deque' objects}
9900 0.001 0.000 0.001 0.000 {method 'popleft' of
'collections.deque' objects}
1682 0.001 0.000 0.001 0.000 {method 'append' of 'list'
objects}
1 0.000 0.000 0.000 0.000 {method 'disable' of
'_lsprof.Profiler' objects}
{noformat}
It appears that the lock which has to be acquired per work item is very
expensive.
> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---------------------------------------------------------------------------
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Affects Versions: 2.18.0
> Reporter: Yichi Zhang
> Priority: Critical
> Attachments: checkpoint-duration.png, profiling.png,
> profiling_one_thread.png, profiling_twelve_threads.png
>
> Time Spent: 4h 20m
> Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load
> tests.
>
> After some investigation, it appears to be caused by swapping the original
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is
> that python performance is worse with more threads on cpu-bounded tasks.
>
> A simple test for comparing the multiple thread pool executor performance:
>
> {code:python}
> def test_performance(self):
> def run_perf(executor):
> total_number = 1000000
> q = queue.Queue()
> def task(number):
> hash(number)
> q.put(number + 200)
> return number
> t = time.time()
> count = 0
> for i in range(200):
> q.put(i)
> while count < total_number:
> executor.submit(task, q.get(block=True))
> count += 1
> print('%s uses %s' % (executor, time.time() - t))
> with UnboundedThreadPoolExecutor() as executor:
> run_perf(executor)
> with futures.ThreadPoolExecutor(max_workers=1) as executor:
> run_perf(executor)
> with futures.ThreadPoolExecutor(max_workers=12) as executor:
> run_perf(executor)
> {code}
> Results:
> <apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at
> 0x7fab400dbe50> uses 268.160675049
> <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab40096290> uses
> 79.904583931
> <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab400dbe50> uses
> 191.179054976
> ```
> Profiling:
> UnboundedThreadPoolExecutor:
> !profiling.png!
> 1 Thread ThreadPoolExecutor:
> !profiling_one_thread.png!
> 12 Threads ThreadPoolExecutor:
> !profiling_twelve_threads.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)