[
https://issues.apache.org/jira/browse/BEAM-8944?focusedWorklogId=429650&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-429650
]
ASF GitHub Bot logged work on BEAM-8944:
----------------------------------------
Author: ASF GitHub Bot
Created on: 01/May/20 20:31
Start Date: 01/May/20 20:31
Worklog Time Spent: 10m
Work Description: lukecwik opened a new pull request #11590:
URL: https://github.com/apache/beam/pull/11590
Existing performance suffered because of the use of timed waits and also due
to the increase in number of "threading" objects being invoked.
Using the benchmark from
https://issues.apache.org/jira/browse/BEAM-8944?focusedCommentId=17074641&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17074641
Improved performance from 5.52s down to 1.82s which is faster then the
ThreadPoolExecutor with 12 threads (the default being used before) but still
slower then the ThreadPoolExecutor with 1 thread.
The prior performance was:
```
<apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object
at 0x7feee655c8d0> uses 5.52247905731
584051 function calls in 5.495 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
86250 4.386 0.000 4.386 0.000 {method 'acquire' of
'thread.lock' objects}
19800 0.150 0.000 4.773 0.000 Queue.py:150(get)
29698 0.123 0.000 0.365 0.000 threading.py:373(notify)
9900 0.114 0.000 0.114 0.000 {method '__enter__' of
'thread.lock' objects}
9899 0.106 0.000 0.463 0.000
thread_pool_executor.py:103(accepted_work)
9188 0.089 0.000 4.095 0.000 threading.py:309(wait)
9903 0.078 0.000 0.129 0.000 threading.py:260(__init__)
9900 0.061 0.000 1.092 0.000
thread_pool_executor.py:133(submit)
9899 0.055 0.000 0.358 0.000 threading.py:576(set)
38886 0.044 0.000 0.285 0.000 threading.py:300(_is_owned)
9900 0.041 0.000 0.186 0.000 _base.py:318(__init__)
28987 0.023 0.000 0.031 0.000 Queue.py:200(_qsize)
9900 0.022 0.000 0.030 0.000 threading.py:132(__init__)
38887 0.021 0.000 0.021 0.000 threading.py:64(_note)
9899 0.019 0.000 0.163 0.000 threading.py:400(notifyAll)
19799 0.018 0.000 0.023 0.000 Queue.py:208(_get)
38887 0.016 0.000 0.016 0.000 {method 'release' of
'thread.lock' objects}
9903 0.016 0.000 0.145 0.000 threading.py:242(Condition)
19806 0.014 0.000 0.014 0.000 threading.py:59(__init__)
9900 0.014 0.000 0.127 0.000 threading.py:285(__enter__)
9188 0.013 0.000 0.090 0.000
threading.py:297(_acquire_restore)
9900 0.013 0.000 0.043 0.000 threading.py:114(RLock)
9900 0.011 0.000 0.011 0.000
thread_pool_executor.py:34(__init__)
38886 0.011 0.000 0.011 0.000 {len}
9900 0.010 0.000 0.012 0.000 threading.py:288(__exit__)
9188 0.008 0.000 0.012 0.000 threading.py:294(_release_save)
19092 0.006 0.000 0.006 0.000 {thread.allocate_lock}
19799 0.005 0.000 0.005 0.000 {method 'popleft' of
'collections.deque' objects}
9188 0.004 0.000 0.004 0.000 {method 'append' of 'list'
objects}
9899 0.003 0.000 0.003 0.000 {method 'remove' of 'list'
objects}
9900 0.002 0.000 0.002 0.000 {method '__exit__' of
'thread.lock' objects}
1 0.000 0.000 0.000 0.000 {thread.start_new_thread}
1 0.000 0.000 0.000 0.000 threading.py:647(__init__)
1 0.000 0.000 0.000 0.000 threading.py:717(start)
1 0.000 0.000 0.000 0.000
thread_pool_executor.py:58(__init__)
1 0.000 0.000 0.000 0.000 threading.py:620(_newname)
1 0.000 0.000 0.000 0.000 _weakrefset.py:83(add)
1 0.000 0.000 0.000 0.000 threading.py:597(wait)
2 0.000 0.000 0.000 0.000 threading.py:561(__init__)
1 0.000 0.000 0.000 0.000
threading.py:1142(currentThread)
2 0.000 0.000 0.000 0.000 threading.py:542(Event)
1 0.000 0.000 0.000 0.000 threading.py:700(_set_daemon)
1 0.000 0.000 0.000 0.000 threading.py:1014(daemon)
2 0.000 0.000 0.000 0.000 threading.py:570(isSet)
1 0.000 0.000 0.000 0.000 threading.py:999(daemon)
1 0.000 0.000 0.000 0.000 {thread.get_ident}
1 0.000 0.000 0.000 0.000 {method 'add' of 'set' objects}
1 0.000 0.000 0.000 0.000 {method 'disable' of
'_lsprof.Profiler' objects}
```
The new performance is:
```
<apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object
at 0x7f71dd5cacd0> uses 1.82196497917
504935 function calls in 1.787 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
62233 0.940 0.000 0.940 0.000 {method 'acquire' of
'thread.lock' objects}
9900 0.142 0.000 1.248 0.000
thread_pool_executor.py:98(submit)
29698 0.102 0.000 0.364 0.000 threading.py:373(notify)
19800 0.101 0.000 0.880 0.000 Queue.py:150(get)
9900 0.068 0.000 0.068 0.000 {method '__enter__' of
'thread.lock' objects}
40304 0.063 0.000 0.063 0.000 {method 'release' of
'thread.lock' objects}
9899 0.053 0.000 0.323 0.000 threading.py:479(release)
9899 0.044 0.000 0.299 0.000 Queue.py:86(qsize)
9903 0.043 0.000 0.082 0.000 threading.py:260(__init__)
30407 0.031 0.000 0.221 0.000 threading.py:300(_is_owned)
9900 0.027 0.000 0.119 0.000 _base.py:318(__init__)
30407 0.022 0.000 0.027 0.000 Queue.py:200(_qsize)
9900 0.017 0.000 0.025 0.000 threading.py:132(__init__)
19799 0.016 0.000 0.019 0.000 Queue.py:208(_get)
9899 0.015 0.000 0.338 0.000
thread_pool_executor.py:77(assign_work)
40307 0.015 0.000 0.015 0.000 threading.py:64(_note)
9900 0.015 0.000 0.019 0.000 threading.py:288(__exit__)
9903 0.011 0.000 0.093 0.000 threading.py:242(Condition)
19806 0.010 0.000 0.010 0.000 threading.py:59(__init__)
9900 0.009 0.000 0.034 0.000 threading.py:114(RLock)
9900 0.009 0.000 0.077 0.000 threading.py:285(__enter__)
709 0.007 0.000 0.148 0.000 threading.py:309(wait)
9900 0.007 0.000 0.007 0.000
thread_pool_executor.py:34(__init__)
30407 0.005 0.000 0.005 0.000 {len}
9900 0.004 0.000 0.004 0.000 {method '__exit__' of
'thread.lock' objects}
10613 0.003 0.000 0.003 0.000 {thread.allocate_lock}
19799 0.003 0.000 0.003 0.000 {method 'popleft' of
'collections.deque' objects}
9896 0.002 0.000 0.002 0.000 {method 'remove' of 'list'
objects}
709 0.001 0.000 0.023 0.000
threading.py:297(_acquire_restore)
709 0.001 0.000 0.002 0.000 threading.py:294(_release_save)
709 0.000 0.000 0.000 0.000 {method 'append' of 'list'
objects}
1 0.000 0.000 0.000 0.000 {thread.start_new_thread}
1 0.000 0.000 0.000 0.000 threading.py:717(start)
1 0.000 0.000 0.000 0.000 threading.py:647(__init__)
1 0.000 0.000 0.000 0.000 threading.py:620(_newname)
1 0.000 0.000 0.000 0.000
thread_pool_executor.py:58(__init__)
1 0.000 0.000 0.000 0.000
threading.py:1142(currentThread)
1 0.000 0.000 0.000 0.000 _weakrefset.py:83(add)
1 0.000 0.000 0.000 0.000 threading.py:597(wait)
1 0.000 0.000 0.000 0.000 threading.py:433(__init__)
1 0.000 0.000 0.000 0.000 threading.py:700(_set_daemon)
1 0.000 0.000 0.000 0.000 threading.py:542(Event)
1 0.000 0.000 0.000 0.000 threading.py:561(__init__)
1 0.000 0.000 0.000 0.000 threading.py:1014(daemon)
1 0.000 0.000 0.000 0.000 threading.py:412(Semaphore)
2 0.000 0.000 0.000 0.000 threading.py:570(isSet)
1 0.000 0.000 0.000 0.000 threading.py:999(daemon)
1 0.000 0.000 0.000 0.000 {thread.get_ident}
1 0.000 0.000 0.000 0.000 {method 'add' of 'set' objects}
1 0.000 0.000 0.000 0.000 {method 'disable' of
'_lsprof.Profiler' objects}
```
The max_workers=1 ThreadPoolExecutor is:
```
<concurrent.futures.thread.ThreadPoolExecutor object at 0x7f71dd5e7710> uses
1.03040599823
436503 function calls in 1.011 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
59160 0.462 0.000 0.462 0.000 {method 'acquire' of
'thread.lock' objects}
9900 0.114 0.000 0.114 0.000 {method '__enter__' of
'thread.lock' objects}
9900 0.072 0.000 0.537 0.000 thread.py:128(submit)
9900 0.045 0.000 0.474 0.000 Queue.py:150(get)
9900 0.034 0.000 0.167 0.000 threading.py:440(acquire)
9900 0.034 0.000 0.061 0.000 threading.py:260(__init__)
19800 0.033 0.000 0.112 0.000 threading.py:373(notify)
9900 0.031 0.000 0.192 0.000 Queue.py:107(put)
4890 0.024 0.000 0.269 0.000 threading.py:309(wait)
9900 0.022 0.000 0.092 0.000 _base.py:318(__init__)
25561 0.017 0.000 0.017 0.000 {method 'release' of
'thread.lock' objects}
24690 0.016 0.000 0.091 0.000 threading.py:300(_is_owned)
9900 0.011 0.000 0.016 0.000 threading.py:132(__init__)
34590 0.010 0.000 0.010 0.000 threading.py:64(_note)
9900 0.009 0.000 0.176 0.000
thread.py:141(_adjust_thread_count)
9900 0.009 0.000 0.070 0.000 threading.py:242(Condition)
9900 0.007 0.000 0.023 0.000 threading.py:114(RLock)
14790 0.007 0.000 0.009 0.000 Queue.py:200(_qsize)
19800 0.007 0.000 0.007 0.000 threading.py:59(__init__)
9900 0.007 0.000 0.008 0.000 Queue.py:208(_get)
9900 0.006 0.000 0.010 0.000 threading.py:288(__exit__)
9900 0.006 0.000 0.007 0.000 Queue.py:204(_put)
9900 0.006 0.000 0.119 0.000 threading.py:285(__enter__)
9900 0.006 0.000 0.006 0.000 thread.py:52(__init__)
9900 0.003 0.000 0.003 0.000 {method '__exit__' of
'thread.lock' objects}
14790 0.003 0.000 0.003 0.000 {thread.allocate_lock}
4890 0.003 0.000 0.020 0.000
threading.py:297(_acquire_restore)
4890 0.003 0.000 0.008 0.000 threading.py:294(_release_save)
14790 0.002 0.000 0.002 0.000 {len}
9900 0.001 0.000 0.001 0.000 {method 'popleft' of
'collections.deque' objects}
9900 0.001 0.000 0.001 0.000 {method 'append' of
'collections.deque' objects}
4890 0.001 0.000 0.001 0.000 {method 'append' of 'list'
objects}
871 0.000 0.000 0.000 0.000 {method 'remove' of 'list'
objects}
1 0.000 0.000 0.000 0.000 {method 'disable' of
'_lsprof.Profiler' objects}
```
The max_workers=12 ThreadPoolExecutor is:
```
<concurrent.futures.thread.ThreadPoolExecutor object at 0x7f71dd575d90> uses
2.03954315186
402533 function calls in 2.002 seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
44284 0.878 0.000 0.878 0.000 {method 'acquire' of
'thread.lock' objects}
9902 0.228 0.000 0.228 0.000 {method '__enter__' of
'thread.lock' objects}
9900 0.207 0.000 1.342 0.000 thread.py:128(submit)
9900 0.084 0.000 0.660 0.000 Queue.py:150(get)
19800 0.084 0.000 0.269 0.000 threading.py:373(notify)
9900 0.080 0.000 0.351 0.000 threading.py:440(acquire)
30862 0.069 0.000 0.069 0.000 {method 'release' of
'thread.lock' objects}
9900 0.054 0.000 0.594 0.000 Queue.py:107(put)
9904 0.054 0.000 0.102 0.000 threading.py:260(__init__)
9900 0.032 0.000 0.150 0.000 _base.py:318(__init__)
20971 0.027 0.000 0.144 0.000 threading.py:300(_is_owned)
9900 0.022 0.000 0.383 0.000
thread.py:141(_adjust_thread_count)
9900 0.018 0.000 0.028 0.000 threading.py:132(__init__)
30871 0.017 0.000 0.017 0.000 threading.py:64(_note)
9904 0.016 0.000 0.118 0.000 threading.py:242(Condition)
9902 0.015 0.000 0.244 0.000 threading.py:285(__enter__)
9902 0.014 0.000 0.020 0.000 threading.py:288(__exit__)
19808 0.013 0.000 0.013 0.000 threading.py:59(__init__)
11069 0.013 0.000 0.016 0.000 Queue.py:200(_qsize)
9900 0.013 0.000 0.041 0.000 threading.py:114(RLock)
1171 0.011 0.000 0.234 0.000 threading.py:309(wait)
9900 0.011 0.000 0.012 0.000 Queue.py:204(_put)
9900 0.011 0.000 0.012 0.000 Queue.py:208(_get)
9900 0.008 0.000 0.008 0.000 thread.py:52(__init__)
9902 0.007 0.000 0.007 0.000 {method '__exit__' of
'thread.lock' objects}
11075 0.005 0.000 0.005 0.000 {thread.allocate_lock}
11071 0.003 0.000 0.003 0.000 {len}
9891 0.002 0.000 0.002 0.000 {method 'remove' of 'list'
objects}
9900 0.002 0.000 0.002 0.000 {method 'popleft' of
'collections.deque' objects}
9900 0.002 0.000 0.002 0.000 {method 'append' of
'collections.deque' objects}
1171 0.001 0.000 0.041 0.000
threading.py:297(_acquire_restore)
1171 0.001 0.000 0.004 0.000 threading.py:294(_release_save)
1171 0.000 0.000 0.000 0.000 {method 'append' of 'list'
objects}
2 0.000 0.000 0.000 0.000 {thread.start_new_thread}
2 0.000 0.000 0.000 0.000 threading.py:647(__init__)
2 0.000 0.000 0.010 0.005 threading.py:717(start)
2 0.000 0.000 0.010 0.005 threading.py:597(wait)
2 0.000 0.000 0.000 0.000 threading.py:700(_set_daemon)
2 0.000 0.000 0.000 0.000 weakref.py:368(__setitem__)
2 0.000 0.000 0.000 0.000 threading.py:542(Event)
2 0.000 0.000 0.000 0.000
threading.py:1142(currentThread)
2 0.000 0.000 0.000 0.000 threading.py:561(__init__)
2 0.000 0.000 0.000 0.000 threading.py:1014(daemon)
2 0.000 0.000 0.000 0.000 {method 'add' of 'set' objects}
2 0.000 0.000 0.000 0.000 threading.py:999(daemon)
4 0.000 0.000 0.000 0.000 threading.py:570(isSet)
2 0.000 0.000 0.000 0.000 {thread.get_ident}
1 0.000 0.000 0.000 0.000 {method 'disable' of
'_lsprof.Profiler' objects}
```
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
Post-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
--- | --- | --- | --- | --- | --- | --- | ---
Go | [](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
Java | [](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
Python | [](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)
| --- | [](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
XLang | --- | --- | --- | [](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/)
| --- | --- | [](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
Pre-Commit Tests Status (on master branch)
------------------------------------------------------------------------------------------------
--- |Java | Python | Go | Website
--- | --- | --- | --- | ---
Non-portable | [](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
| [](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
Portable | --- | [](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
| --- | ---
See
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
for trigger phrase, status and link of all Jenkins jobs.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 429650)
Time Spent: 4.5h (was: 4h 20m)
> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---------------------------------------------------------------------------
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Affects Versions: 2.18.0
> Reporter: Yichi Zhang
> Priority: Critical
> Attachments: checkpoint-duration.png, profiling.png,
> profiling_one_thread.png, profiling_twelve_threads.png
>
> Time Spent: 4.5h
> Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load
> tests.
>
> After some investigation, it appears to be caused by swapping the original
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is
> that python performance is worse with more threads on cpu-bounded tasks.
>
> A simple test for comparing the multiple thread pool executor performance:
>
> {code:python}
> def test_performance(self):
> def run_perf(executor):
> total_number = 1000000
> q = queue.Queue()
> def task(number):
> hash(number)
> q.put(number + 200)
> return number
> t = time.time()
> count = 0
> for i in range(200):
> q.put(i)
> while count < total_number:
> executor.submit(task, q.get(block=True))
> count += 1
> print('%s uses %s' % (executor, time.time() - t))
> with UnboundedThreadPoolExecutor() as executor:
> run_perf(executor)
> with futures.ThreadPoolExecutor(max_workers=1) as executor:
> run_perf(executor)
> with futures.ThreadPoolExecutor(max_workers=12) as executor:
> run_perf(executor)
> {code}
> Results:
> <apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at
> 0x7fab400dbe50> uses 268.160675049
> <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab40096290> uses
> 79.904583931
> <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab400dbe50> uses
> 191.179054976
> ```
> Profiling:
> UnboundedThreadPoolExecutor:
> !profiling.png!
> 1 Thread ThreadPoolExecutor:
> !profiling_one_thread.png!
> 12 Threads ThreadPoolExecutor:
> !profiling_twelve_threads.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)