lukecwik opened a new pull request #11590: URL: https://github.com/apache/beam/pull/11590
Existing performance suffered because of the use of timed waits and also due to the increase in number of "threading" objects being invoked. Using the benchmark from https://issues.apache.org/jira/browse/BEAM-8944?focusedCommentId=17074641&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17074641 Improved performance from 5.52s down to 1.82s which is faster then the ThreadPoolExecutor with 12 threads (the default being used before) but still slower then the ThreadPoolExecutor with 1 thread. The prior performance was: ``` <apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at 0x7feee655c8d0> uses 5.52247905731 584051 function calls in 5.495 seconds Ordered by: internal time ncalls tottime percall cumtime percall filename:lineno(function) 86250 4.386 0.000 4.386 0.000 {method 'acquire' of 'thread.lock' objects} 19800 0.150 0.000 4.773 0.000 Queue.py:150(get) 29698 0.123 0.000 0.365 0.000 threading.py:373(notify) 9900 0.114 0.000 0.114 0.000 {method '__enter__' of 'thread.lock' objects} 9899 0.106 0.000 0.463 0.000 thread_pool_executor.py:103(accepted_work) 9188 0.089 0.000 4.095 0.000 threading.py:309(wait) 9903 0.078 0.000 0.129 0.000 threading.py:260(__init__) 9900 0.061 0.000 1.092 0.000 thread_pool_executor.py:133(submit) 9899 0.055 0.000 0.358 0.000 threading.py:576(set) 38886 0.044 0.000 0.285 0.000 threading.py:300(_is_owned) 9900 0.041 0.000 0.186 0.000 _base.py:318(__init__) 28987 0.023 0.000 0.031 0.000 Queue.py:200(_qsize) 9900 0.022 0.000 0.030 0.000 threading.py:132(__init__) 38887 0.021 0.000 0.021 0.000 threading.py:64(_note) 9899 0.019 0.000 0.163 0.000 threading.py:400(notifyAll) 19799 0.018 0.000 0.023 0.000 Queue.py:208(_get) 38887 0.016 0.000 0.016 0.000 {method 'release' of 'thread.lock' objects} 9903 0.016 0.000 0.145 0.000 threading.py:242(Condition) 19806 0.014 0.000 0.014 0.000 threading.py:59(__init__) 9900 0.014 0.000 0.127 0.000 threading.py:285(__enter__) 9188 0.013 0.000 0.090 0.000 threading.py:297(_acquire_restore) 9900 0.013 0.000 0.043 0.000 threading.py:114(RLock) 9900 0.011 0.000 0.011 0.000 thread_pool_executor.py:34(__init__) 38886 0.011 0.000 0.011 0.000 {len} 9900 0.010 0.000 0.012 0.000 threading.py:288(__exit__) 9188 0.008 0.000 0.012 0.000 threading.py:294(_release_save) 19092 0.006 0.000 0.006 0.000 {thread.allocate_lock} 19799 0.005 0.000 0.005 0.000 {method 'popleft' of 'collections.deque' objects} 9188 0.004 0.000 0.004 0.000 {method 'append' of 'list' objects} 9899 0.003 0.000 0.003 0.000 {method 'remove' of 'list' objects} 9900 0.002 0.000 0.002 0.000 {method '__exit__' of 'thread.lock' objects} 1 0.000 0.000 0.000 0.000 {thread.start_new_thread} 1 0.000 0.000 0.000 0.000 threading.py:647(__init__) 1 0.000 0.000 0.000 0.000 threading.py:717(start) 1 0.000 0.000 0.000 0.000 thread_pool_executor.py:58(__init__) 1 0.000 0.000 0.000 0.000 threading.py:620(_newname) 1 0.000 0.000 0.000 0.000 _weakrefset.py:83(add) 1 0.000 0.000 0.000 0.000 threading.py:597(wait) 2 0.000 0.000 0.000 0.000 threading.py:561(__init__) 1 0.000 0.000 0.000 0.000 threading.py:1142(currentThread) 2 0.000 0.000 0.000 0.000 threading.py:542(Event) 1 0.000 0.000 0.000 0.000 threading.py:700(_set_daemon) 1 0.000 0.000 0.000 0.000 threading.py:1014(daemon) 2 0.000 0.000 0.000 0.000 threading.py:570(isSet) 1 0.000 0.000 0.000 0.000 threading.py:999(daemon) 1 0.000 0.000 0.000 0.000 {thread.get_ident} 1 0.000 0.000 0.000 0.000 {method 'add' of 'set' objects} 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects} ``` The new performance is: ``` <apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at 0x7f71dd5cacd0> uses 1.82196497917 504935 function calls in 1.787 seconds Ordered by: internal time ncalls tottime percall cumtime percall filename:lineno(function) 62233 0.940 0.000 0.940 0.000 {method 'acquire' of 'thread.lock' objects} 9900 0.142 0.000 1.248 0.000 thread_pool_executor.py:98(submit) 29698 0.102 0.000 0.364 0.000 threading.py:373(notify) 19800 0.101 0.000 0.880 0.000 Queue.py:150(get) 9900 0.068 0.000 0.068 0.000 {method '__enter__' of 'thread.lock' objects} 40304 0.063 0.000 0.063 0.000 {method 'release' of 'thread.lock' objects} 9899 0.053 0.000 0.323 0.000 threading.py:479(release) 9899 0.044 0.000 0.299 0.000 Queue.py:86(qsize) 9903 0.043 0.000 0.082 0.000 threading.py:260(__init__) 30407 0.031 0.000 0.221 0.000 threading.py:300(_is_owned) 9900 0.027 0.000 0.119 0.000 _base.py:318(__init__) 30407 0.022 0.000 0.027 0.000 Queue.py:200(_qsize) 9900 0.017 0.000 0.025 0.000 threading.py:132(__init__) 19799 0.016 0.000 0.019 0.000 Queue.py:208(_get) 9899 0.015 0.000 0.338 0.000 thread_pool_executor.py:77(assign_work) 40307 0.015 0.000 0.015 0.000 threading.py:64(_note) 9900 0.015 0.000 0.019 0.000 threading.py:288(__exit__) 9903 0.011 0.000 0.093 0.000 threading.py:242(Condition) 19806 0.010 0.000 0.010 0.000 threading.py:59(__init__) 9900 0.009 0.000 0.034 0.000 threading.py:114(RLock) 9900 0.009 0.000 0.077 0.000 threading.py:285(__enter__) 709 0.007 0.000 0.148 0.000 threading.py:309(wait) 9900 0.007 0.000 0.007 0.000 thread_pool_executor.py:34(__init__) 30407 0.005 0.000 0.005 0.000 {len} 9900 0.004 0.000 0.004 0.000 {method '__exit__' of 'thread.lock' objects} 10613 0.003 0.000 0.003 0.000 {thread.allocate_lock} 19799 0.003 0.000 0.003 0.000 {method 'popleft' of 'collections.deque' objects} 9896 0.002 0.000 0.002 0.000 {method 'remove' of 'list' objects} 709 0.001 0.000 0.023 0.000 threading.py:297(_acquire_restore) 709 0.001 0.000 0.002 0.000 threading.py:294(_release_save) 709 0.000 0.000 0.000 0.000 {method 'append' of 'list' objects} 1 0.000 0.000 0.000 0.000 {thread.start_new_thread} 1 0.000 0.000 0.000 0.000 threading.py:717(start) 1 0.000 0.000 0.000 0.000 threading.py:647(__init__) 1 0.000 0.000 0.000 0.000 threading.py:620(_newname) 1 0.000 0.000 0.000 0.000 thread_pool_executor.py:58(__init__) 1 0.000 0.000 0.000 0.000 threading.py:1142(currentThread) 1 0.000 0.000 0.000 0.000 _weakrefset.py:83(add) 1 0.000 0.000 0.000 0.000 threading.py:597(wait) 1 0.000 0.000 0.000 0.000 threading.py:433(__init__) 1 0.000 0.000 0.000 0.000 threading.py:700(_set_daemon) 1 0.000 0.000 0.000 0.000 threading.py:542(Event) 1 0.000 0.000 0.000 0.000 threading.py:561(__init__) 1 0.000 0.000 0.000 0.000 threading.py:1014(daemon) 1 0.000 0.000 0.000 0.000 threading.py:412(Semaphore) 2 0.000 0.000 0.000 0.000 threading.py:570(isSet) 1 0.000 0.000 0.000 0.000 threading.py:999(daemon) 1 0.000 0.000 0.000 0.000 {thread.get_ident} 1 0.000 0.000 0.000 0.000 {method 'add' of 'set' objects} 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects} ``` The max_workers=1 ThreadPoolExecutor is: ``` <concurrent.futures.thread.ThreadPoolExecutor object at 0x7f71dd5e7710> uses 1.03040599823 436503 function calls in 1.011 seconds Ordered by: internal time ncalls tottime percall cumtime percall filename:lineno(function) 59160 0.462 0.000 0.462 0.000 {method 'acquire' of 'thread.lock' objects} 9900 0.114 0.000 0.114 0.000 {method '__enter__' of 'thread.lock' objects} 9900 0.072 0.000 0.537 0.000 thread.py:128(submit) 9900 0.045 0.000 0.474 0.000 Queue.py:150(get) 9900 0.034 0.000 0.167 0.000 threading.py:440(acquire) 9900 0.034 0.000 0.061 0.000 threading.py:260(__init__) 19800 0.033 0.000 0.112 0.000 threading.py:373(notify) 9900 0.031 0.000 0.192 0.000 Queue.py:107(put) 4890 0.024 0.000 0.269 0.000 threading.py:309(wait) 9900 0.022 0.000 0.092 0.000 _base.py:318(__init__) 25561 0.017 0.000 0.017 0.000 {method 'release' of 'thread.lock' objects} 24690 0.016 0.000 0.091 0.000 threading.py:300(_is_owned) 9900 0.011 0.000 0.016 0.000 threading.py:132(__init__) 34590 0.010 0.000 0.010 0.000 threading.py:64(_note) 9900 0.009 0.000 0.176 0.000 thread.py:141(_adjust_thread_count) 9900 0.009 0.000 0.070 0.000 threading.py:242(Condition) 9900 0.007 0.000 0.023 0.000 threading.py:114(RLock) 14790 0.007 0.000 0.009 0.000 Queue.py:200(_qsize) 19800 0.007 0.000 0.007 0.000 threading.py:59(__init__) 9900 0.007 0.000 0.008 0.000 Queue.py:208(_get) 9900 0.006 0.000 0.010 0.000 threading.py:288(__exit__) 9900 0.006 0.000 0.007 0.000 Queue.py:204(_put) 9900 0.006 0.000 0.119 0.000 threading.py:285(__enter__) 9900 0.006 0.000 0.006 0.000 thread.py:52(__init__) 9900 0.003 0.000 0.003 0.000 {method '__exit__' of 'thread.lock' objects} 14790 0.003 0.000 0.003 0.000 {thread.allocate_lock} 4890 0.003 0.000 0.020 0.000 threading.py:297(_acquire_restore) 4890 0.003 0.000 0.008 0.000 threading.py:294(_release_save) 14790 0.002 0.000 0.002 0.000 {len} 9900 0.001 0.000 0.001 0.000 {method 'popleft' of 'collections.deque' objects} 9900 0.001 0.000 0.001 0.000 {method 'append' of 'collections.deque' objects} 4890 0.001 0.000 0.001 0.000 {method 'append' of 'list' objects} 871 0.000 0.000 0.000 0.000 {method 'remove' of 'list' objects} 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects} ``` The max_workers=12 ThreadPoolExecutor is: ``` <concurrent.futures.thread.ThreadPoolExecutor object at 0x7f71dd575d90> uses 2.03954315186 402533 function calls in 2.002 seconds Ordered by: internal time ncalls tottime percall cumtime percall filename:lineno(function) 44284 0.878 0.000 0.878 0.000 {method 'acquire' of 'thread.lock' objects} 9902 0.228 0.000 0.228 0.000 {method '__enter__' of 'thread.lock' objects} 9900 0.207 0.000 1.342 0.000 thread.py:128(submit) 9900 0.084 0.000 0.660 0.000 Queue.py:150(get) 19800 0.084 0.000 0.269 0.000 threading.py:373(notify) 9900 0.080 0.000 0.351 0.000 threading.py:440(acquire) 30862 0.069 0.000 0.069 0.000 {method 'release' of 'thread.lock' objects} 9900 0.054 0.000 0.594 0.000 Queue.py:107(put) 9904 0.054 0.000 0.102 0.000 threading.py:260(__init__) 9900 0.032 0.000 0.150 0.000 _base.py:318(__init__) 20971 0.027 0.000 0.144 0.000 threading.py:300(_is_owned) 9900 0.022 0.000 0.383 0.000 thread.py:141(_adjust_thread_count) 9900 0.018 0.000 0.028 0.000 threading.py:132(__init__) 30871 0.017 0.000 0.017 0.000 threading.py:64(_note) 9904 0.016 0.000 0.118 0.000 threading.py:242(Condition) 9902 0.015 0.000 0.244 0.000 threading.py:285(__enter__) 9902 0.014 0.000 0.020 0.000 threading.py:288(__exit__) 19808 0.013 0.000 0.013 0.000 threading.py:59(__init__) 11069 0.013 0.000 0.016 0.000 Queue.py:200(_qsize) 9900 0.013 0.000 0.041 0.000 threading.py:114(RLock) 1171 0.011 0.000 0.234 0.000 threading.py:309(wait) 9900 0.011 0.000 0.012 0.000 Queue.py:204(_put) 9900 0.011 0.000 0.012 0.000 Queue.py:208(_get) 9900 0.008 0.000 0.008 0.000 thread.py:52(__init__) 9902 0.007 0.000 0.007 0.000 {method '__exit__' of 'thread.lock' objects} 11075 0.005 0.000 0.005 0.000 {thread.allocate_lock} 11071 0.003 0.000 0.003 0.000 {len} 9891 0.002 0.000 0.002 0.000 {method 'remove' of 'list' objects} 9900 0.002 0.000 0.002 0.000 {method 'popleft' of 'collections.deque' objects} 9900 0.002 0.000 0.002 0.000 {method 'append' of 'collections.deque' objects} 1171 0.001 0.000 0.041 0.000 threading.py:297(_acquire_restore) 1171 0.001 0.000 0.004 0.000 threading.py:294(_release_save) 1171 0.000 0.000 0.000 0.000 {method 'append' of 'list' objects} 2 0.000 0.000 0.000 0.000 {thread.start_new_thread} 2 0.000 0.000 0.000 0.000 threading.py:647(__init__) 2 0.000 0.000 0.010 0.005 threading.py:717(start) 2 0.000 0.000 0.010 0.005 threading.py:597(wait) 2 0.000 0.000 0.000 0.000 threading.py:700(_set_daemon) 2 0.000 0.000 0.000 0.000 weakref.py:368(__setitem__) 2 0.000 0.000 0.000 0.000 threading.py:542(Event) 2 0.000 0.000 0.000 0.000 threading.py:1142(currentThread) 2 0.000 0.000 0.000 0.000 threading.py:561(__init__) 2 0.000 0.000 0.000 0.000 threading.py:1014(daemon) 2 0.000 0.000 0.000 0.000 {method 'add' of 'set' objects} 2 0.000 0.000 0.000 0.000 threading.py:999(daemon) 4 0.000 0.000 0.000 0.000 threading.py:570(isSet) 2 0.000 0.000 0.000 0.000 {thread.get_ident} 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects} ``` ------------------------ Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] Update `CHANGES.md` with noteworthy changes. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier). Post-Commit Tests Status (on master branch) ------------------------------------------------------------------------------------------------ Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) Java | [](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) Python | [](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) XLang | --- | --- | --- | [](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) Pre-Commit Tests Status (on master branch) ------------------------------------------------------------------------------------------------ --- |Java | Python | Go | Website --- | --- | --- | --- | --- Non-portable | [](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) Portable | --- | [](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
