[ 
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074641#comment-17074641
 ] 

Maximilian Michels commented on BEAM-8944:
------------------------------------------

Thank you for your thoughts Luke. I agree that the best approach is to work on 
a solution based on profiling the current code.

Here is my attempt to write a benchmark which most closely resembles our setup. 
There are at most two active bundles per SDK harness (remember, we use 16 of 
them and distribute bundles round-robin), hence two active tasks. This is 
adjustable in the {{run_benchmark}} method:

{code:python}
import time
from concurrent import futures
from future.moves import queue
from utils.thread_pool_executor import UnboundedThreadPoolExecutor
import cProfile as profiler

def run_benchmark(executor,
                  max_concurrent_threads=2,
                  total_iterations=10000,
                  warmup_iterations=100):
  q = queue.Queue()
  for i in range(max_concurrent_threads):
    q.put(i)

  def task(number):
    q.put(task)
    return number

  count = 0
  start_time = None
  profile = profiler.Profile()

  while count < total_iterations:
    if count == warmup_iterations:
      start_time = time.time()
      profile.enable()
    executor.submit(task, q.get(block=True))
    count += 1

  profile.disable()
  print('%s uses %s' % (executor, time.time() - start_time))
  profile.print_stats(sort='time')


if __name__ == '__main__':
  with UnboundedThreadPoolExecutor() as executor:
    run_benchmark(executor)
  with futures.ThreadPoolExecutor(max_workers=1) as executor:
    run_benchmark(executor)
  with futures.ThreadPoolExecutor(max_workers=12) as executor:
    run_benchmark(executor)

{code}

The results clearly reveal the cost of the current implementation:

{noformat}
<utils.thread_pool_executor.UnboundedThreadPoolExecutor object at 0x10da1f750> 
uses 7.32124900818
         575749 function calls in 7.302 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    83878    6.701    0.000    6.701    0.000 {method 'acquire' of 
'thread.lock' objects}
    19800    0.087    0.000    6.958    0.000 Queue.py:150(get)
     8595    0.064    0.000    6.662    0.001 threading.py:309(wait)
    29698    0.062    0.000    0.157    0.000 threading.py:373(notify)
     9903    0.045    0.000    0.086    0.000 threading.py:260(__init__)
     9900    0.043    0.000    0.481    0.000 
thread_pool_executor.py:134(submit)
     9899    0.040    0.000    0.176    0.000 
thread_pool_executor.py:103(accepted_work)
    38293    0.028    0.000    0.099    0.000 threading.py:300(_is_owned)
     9899    0.025    0.000    0.136    0.000 threading.py:576(set)
     9900    0.020    0.000    0.020    0.000 {method '__enter__' of 
'thread.lock' objects}
     9900    0.020    0.000    0.117    0.000 _base.py:318(__init__)
     9900    0.014    0.000    0.024    0.000 threading.py:132(__init__)
    38294    0.013    0.000    0.013    0.000 threading.py:64(_note)
    28394    0.013    0.000    0.018    0.000 Queue.py:200(_qsize)
    19806    0.012    0.000    0.012    0.000 threading.py:59(__init__)
    19799    0.012    0.000    0.014    0.000 Queue.py:208(_get)
     9899    0.011    0.000    0.076    0.000 threading.py:400(notifyAll)
     9903    0.011    0.000    0.098    0.000 threading.py:242(Condition)
     8595    0.010    0.000    0.053    0.000 threading.py:297(_acquire_restore)
    18499    0.009    0.000    0.009    0.000 {thread.allocate_lock}
     9900    0.009    0.000    0.033    0.000 threading.py:114(RLock)
    38294    0.009    0.000    0.009    0.000 {method 'release' of 
'thread.lock' objects}
     9900    0.007    0.000    0.007    0.000 
thread_pool_executor.py:34(__init__)
     9900    0.007    0.000    0.026    0.000 threading.py:285(__enter__)
    38293    0.007    0.000    0.007    0.000 {len}
     9900    0.007    0.000    0.008    0.000 threading.py:288(__exit__)
     9899    0.004    0.000    0.004    0.000 {method 'remove' of 'list' 
objects}
     8595    0.004    0.000    0.006    0.000 threading.py:294(_release_save)
     8595    0.003    0.000    0.003    0.000 {method 'append' of 'list' 
objects}
    19799    0.003    0.000    0.003    0.000 {method 'popleft' of 
'collections.deque' objects}
     9900    0.002    0.000    0.002    0.000 {method '__exit__' of 
'thread.lock' objects}
        1    0.000    0.000    0.000    0.000 {thread.start_new_thread}
        1    0.000    0.000    0.000    0.000 threading.py:647(__init__)
        1    0.000    0.000    0.000    0.000 threading.py:717(start)
        2    0.000    0.000    0.000    0.000 threading.py:561(__init__)
        1    0.000    0.000    0.000    0.000 threading.py:620(_newname)
        1    0.000    0.000    0.000    0.000 threading.py:597(wait)
        1    0.000    0.000    0.000    0.000 _weakrefset.py:83(add)
        1    0.000    0.000    0.000    0.000 
thread_pool_executor.py:58(__init__)
        1    0.000    0.000    0.000    0.000 threading.py:700(_set_daemon)
        1    0.000    0.000    0.000    0.000 threading.py:1143(currentThread)
        1    0.000    0.000    0.000    0.000 threading.py:1015(daemon)
        2    0.000    0.000    0.000    0.000 threading.py:542(Event)
        1    0.000    0.000    0.000    0.000 {thread.get_ident}
        1    0.000    0.000    0.000    0.000 {method 'add' of 'set' objects}
        2    0.000    0.000    0.000    0.000 threading.py:570(isSet)
        1    0.000    0.000    0.000    0.000 threading.py:999(daemon)
        1    0.000    0.000    0.000    0.000 {method 'disable' of 
'_lsprof.Profiler' objects}


<concurrent.futures.thread.ThreadPoolExecutor object at 0x10da12110> uses 
0.855124950409
         419751 function calls in 0.840 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    53064    0.452    0.000    0.452    0.000 {method 'acquire' of 
'thread.lock' objects}
     9900    0.063    0.000    0.063    0.000 {method '__enter__' of 
'thread.lock' objects}
     9900    0.047    0.000    0.385    0.000 thread.py:128(submit)
     9900    0.031    0.000    0.455    0.000 Queue.py:150(get)
     9900    0.028    0.000    0.054    0.000 threading.py:260(__init__)
    19800    0.024    0.000    0.072    0.000 threading.py:373(notify)
     9900    0.024    0.000    0.103    0.000 threading.py:440(acquire)
     9900    0.023    0.000    0.145    0.000 Queue.py:107(put)
     3366    0.016    0.000    0.314    0.000 threading.py:309(wait)
    26329    0.015    0.000    0.015    0.000 {method 'release' of 
'thread.lock' objects}
     9900    0.015    0.000    0.078    0.000 _base.py:318(__init__)
    23166    0.012    0.000    0.045    0.000 threading.py:300(_is_owned)
     9900    0.010    0.000    0.016    0.000 threading.py:132(__init__)
     9900    0.009    0.000    0.063    0.000 threading.py:242(Condition)
     9900    0.007    0.000    0.111    0.000 
thread.py:141(_adjust_thread_count)
    19800    0.007    0.000    0.007    0.000 threading.py:59(__init__)
     9900    0.006    0.000    0.022    0.000 threading.py:114(RLock)
     9900    0.006    0.000    0.069    0.000 threading.py:285(__enter__)
    33066    0.006    0.000    0.006    0.000 threading.py:64(_note)
     9900    0.005    0.000    0.005    0.000 thread.py:52(__init__)
     9900    0.005    0.000    0.008    0.000 threading.py:288(__exit__)
     9900    0.005    0.000    0.006    0.000 Queue.py:204(_put)
    13266    0.004    0.000    0.006    0.000 Queue.py:200(_qsize)
     9900    0.004    0.000    0.005    0.000 Queue.py:208(_get)
    13266    0.004    0.000    0.004    0.000 {thread.allocate_lock}
     9900    0.003    0.000    0.003    0.000 {method '__exit__' of 
'thread.lock' objects}
     3366    0.002    0.000    0.007    0.000 threading.py:297(_acquire_restore)
     3366    0.002    0.000    0.002    0.000 threading.py:294(_release_save)
    13266    0.002    0.000    0.002    0.000 {len}
     3163    0.001    0.000    0.001    0.000 {method 'remove' of 'list' 
objects}
     9900    0.001    0.000    0.001    0.000 {method 'append' of 
'collections.deque' objects}
     3366    0.001    0.000    0.001    0.000 {method 'append' of 'list' 
objects}
     9900    0.001    0.000    0.001    0.000 {method 'popleft' of 
'collections.deque' objects}
        1    0.000    0.000    0.000    0.000 {method 'disable' of 
'_lsprof.Profiler' objects}


<concurrent.futures.thread.ThreadPoolExecutor object at 0x10da18e50> uses 
1.18780708313
         409313 function calls in 1.168 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    46328    0.518    0.000    0.518    0.000 {method 'acquire' of 
'thread.lock' objects}
     9900    0.103    0.000    0.728    0.000 thread.py:128(submit)
     9900    0.095    0.000    0.095    0.000 {method '__enter__' of 
'thread.lock' objects}
     9900    0.067    0.000    0.187    0.000 threading.py:440(acquire)
    19800    0.054    0.000    0.135    0.000 threading.py:373(notify)
     9900    0.052    0.000    0.440    0.000 Queue.py:150(get)
     9900    0.038    0.000    0.328    0.000 Queue.py:107(put)
    31214    0.035    0.000    0.035    0.000 {method 'release' of 
'thread.lock' objects}
     9900    0.033    0.000    0.061    0.000 threading.py:260(__init__)
     9900    0.020    0.000    0.090    0.000 _base.py:318(__init__)
    21482    0.017    0.000    0.062    0.000 threading.py:300(_is_owned)
     9900    0.015    0.000    0.202    0.000 
thread.py:141(_adjust_thread_count)
     1682    0.015    0.000    0.184    0.000 threading.py:309(wait)
     9900    0.011    0.000    0.106    0.000 threading.py:285(__enter__)
     9900    0.011    0.000    0.017    0.000 threading.py:132(__init__)
    31382    0.010    0.000    0.010    0.000 threading.py:64(_note)
    11582    0.010    0.000    0.011    0.000 Queue.py:200(_qsize)
     9900    0.009    0.000    0.070    0.000 threading.py:242(Condition)
    19800    0.008    0.000    0.008    0.000 threading.py:59(__init__)
     9900    0.007    0.000    0.024    0.000 threading.py:114(RLock)
     9900    0.006    0.000    0.007    0.000 Queue.py:204(_put)
     9900    0.005    0.000    0.005    0.000 thread.py:52(__init__)
     9900    0.005    0.000    0.010    0.000 threading.py:288(__exit__)
     9900    0.005    0.000    0.006    0.000 Queue.py:208(_get)
     9900    0.004    0.000    0.004    0.000 {method '__exit__' of 
'thread.lock' objects}
    11582    0.004    0.000    0.004    0.000 {thread.allocate_lock}
     9732    0.004    0.000    0.004    0.000 {method 'remove' of 'list' 
objects}
     1682    0.002    0.000    0.003    0.000 threading.py:294(_release_save)
    11582    0.002    0.000    0.002    0.000 {len}
     1682    0.002    0.000    0.033    0.000 threading.py:297(_acquire_restore)
     9900    0.001    0.000    0.001    0.000 {method 'append' of 
'collections.deque' objects}
     9900    0.001    0.000    0.001    0.000 {method 'popleft' of 
'collections.deque' objects}
     1682    0.001    0.000    0.001    0.000 {method 'append' of 'list' 
objects}
        1    0.000    0.000    0.000    0.000 {method 'disable' of 
'_lsprof.Profiler' objects}
{noformat}

It appears that the lock which has to be acquired per work item is very 
expensive.

> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---------------------------------------------------------------------------
>
>                 Key: BEAM-8944
>                 URL: https://issues.apache.org/jira/browse/BEAM-8944
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-harness
>    Affects Versions: 2.18.0
>            Reporter: Yichi Zhang
>            Priority: Critical
>         Attachments: checkpoint-duration.png, profiling.png, 
> profiling_one_thread.png, profiling_twelve_threads.png
>
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load 
> tests.
>  
> After some investigation, it appears to be caused by swapping the original 
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is 
> that python performance is worse with more threads on cpu-bounded tasks.
>  
> A simple test for comparing the multiple thread pool executor performance:
>  
> {code:python}
> def test_performance(self):
>    def run_perf(executor):
>      total_number = 1000000
>      q = queue.Queue()
>     def task(number):
>        hash(number)
>        q.put(number + 200)
>        return number
>     t = time.time()
>      count = 0
>      for i in range(200):
>        q.put(i)
>     while count < total_number:
>        executor.submit(task, q.get(block=True))
>        count += 1
>      print('%s uses %s' % (executor, time.time() - t))
>    with UnboundedThreadPoolExecutor() as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=1) as executor:
>      run_perf(executor)
>    with futures.ThreadPoolExecutor(max_workers=12) as executor:
>      run_perf(executor)
> {code}
> Results:
> <apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at 
> 0x7fab400dbe50> uses 268.160675049
>  <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab40096290> uses 
> 79.904583931
>  <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab400dbe50> uses 
> 191.179054976
>  ```
> Profiling:
> UnboundedThreadPoolExecutor:
>  !profiling.png! 
> 1 Thread ThreadPoolExecutor:
>  !profiling_one_thread.png! 
> 12 Threads ThreadPoolExecutor:
>  !profiling_twelve_threads.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to