[
https://issues.apache.org/jira/browse/BEAM-8944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17072840#comment-17072840
]
Luke Cwik commented on BEAM-8944:
---------------------------------
So the original analysis done in the description of the bug isn't typical usage
because it is comparing creating `M` threads which each process one item vs
creating a fixed number of threads which then process that then process the
items off a queue. It is easy to see that the performance difference will be in
the number of threads created. It is expected that in typical usage threads
would be reused many times before being garbage collected.
Max, for your analysis in the above comment:
Is Flink is now able to schedule so much more work since the Python SDK harness
will do an infinite number due to the UnboundedThreadPool vs the fixed number
it supported before (so increased contention)?
Is it possible that ThreadPoolExecutor has a C++ implementation which is making
up for the difference?
Do you have timing information between the two run that compares the two
implementations?
Do you have a graph showing how many threads are alive in the Python process
between the two runs?
> Python SDK harness performance degradation with UnboundedThreadPoolExecutor
> ---------------------------------------------------------------------------
>
> Key: BEAM-8944
> URL: https://issues.apache.org/jira/browse/BEAM-8944
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-harness
> Affects Versions: 2.18.0
> Reporter: Yichi Zhang
> Priority: Critical
> Fix For: 2.18.0
>
> Attachments: checkpoint-duration.png, profiling.png,
> profiling_one_thread.png, profiling_twelve_threads.png
>
> Time Spent: 4h 20m
> Remaining Estimate: 0h
>
> We are seeing a performance degradation for python streaming word count load
> tests.
>
> After some investigation, it appears to be caused by swapping the original
> ThreadPoolExecutor to UnboundedThreadPoolExecutor in sdk worker. Suspicion is
> that python performance is worse with more threads on cpu-bounded tasks.
>
> A simple test for comparing the multiple thread pool executor performance:
>
> {code:python}
> def test_performance(self):
> def run_perf(executor):
> total_number = 1000000
> q = queue.Queue()
> def task(number):
> hash(number)
> q.put(number + 200)
> return number
> t = time.time()
> count = 0
> for i in range(200):
> q.put(i)
> while count < total_number:
> executor.submit(task, q.get(block=True))
> count += 1
> print('%s uses %s' % (executor, time.time() - t))
> with UnboundedThreadPoolExecutor() as executor:
> run_perf(executor)
> with futures.ThreadPoolExecutor(max_workers=1) as executor:
> run_perf(executor)
> with futures.ThreadPoolExecutor(max_workers=12) as executor:
> run_perf(executor)
> {code}
> Results:
> <apache_beam.utils.thread_pool_executor.UnboundedThreadPoolExecutor object at
> 0x7fab400dbe50> uses 268.160675049
> <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab40096290> uses
> 79.904583931
> <concurrent.futures.thread.ThreadPoolExecutor object at 0x7fab400dbe50> uses
> 191.179054976
> ```
> Profiling:
> UnboundedThreadPoolExecutor:
> !profiling.png!
> 1 Thread ThreadPoolExecutor:
> !profiling_one_thread.png!
> 12 Threads ThreadPoolExecutor:
> !profiling_twelve_threads.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)