[
https://issues.apache.org/jira/browse/BEAM-12781?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17402329#comment-17402329
]
Pablo Estrada commented on BEAM-12781:
--------------------------------------
Okay, I finally have a reasonable theory of what's going on here. I believe the
issue is caused by the separation of the {{split}} call of a BoundedSource into
a different stage. In unified worker it seems like we're pickling the
intermediate result for this.
Profiling from Runner V2 shows the slowest call being pickle.dumps:
{{In [8]: p.sort_stats('tottime').print_stats()
Tue Aug 17 11:47:42 2021
profilingmofilingba/cpu_profile/2021-08-17_18_43_19-process_bundle-5799522238727205118-4
538200134 function calls (537540317 primitive calls) in 2278.252
seconds
Ordered by: internal time
ncalls tottime percall cumtime percall filename:lineno(function)
21043 1021.509 0.049 1335.965 0.063 \{built-in method _pickle.dumps}}}
This function takes an extra 1000 unparallelizable seconds in a 1TB pipeline.
After inspecting the Runner V1 profile, I did not encounter this function (in
fact, the functions that encode the data are extremely fast):
{{Out[20]: <pstats.Stats at 0x7f158b159760>
In [21]: p.sort_stats('cumtime').print_stats()
Tue Aug 17 14:14:23 2021
profilingmofiling_runnerv1/cpu_profile/2021-08-17_21_08_44-8751929660737608456
157945660 function calls (153771026 primitive calls) in 977.880 seconds
Ordered by: cumulative time
ncalls tottime percall cumtime percall filename:lineno(function)
1 0.000 0.000 978.007 978.007
/usr/local/lib/python3.8/site-packages/dataflow_worker/batchworker.py:623(do_work)
1 0.000 0.000 966.360 966.360
/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py:224(execute)
1 0.003 0.003 966.360 966.360
/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py:228(_perform_source_split_considering_api_limits)
1 0.022 0.022 958.413 958.413
/usr/local/lib/python3.8/site-packages/dataflow_worker/executor.py:264(_perform_source_split)
10321 0.005 0.000 940.065 0.091
/usr/local/lib/python3.8/site-packages/apache_beam/io/gcp/bigquery.py:774(split)
....................................................................................................................
................................ SEVERAL LINES
................................................................
....................................................................................................................
460021 3.152 0.000 18.662 0.000
/usr/local/lib/python3.8/site-packages/apitools/base/protorpclite/messages.py:761(__init__)
142141/52122 0.290 0.000 18.447 0.000
/usr/local/lib/python3.8/json/encoder.py:182(encode)
1 0.000 0.000 18.303 18.303
/usr/local/lib/python3.8/site-packages/dataflow_worker/workerapiclient.py:593(splits_to_split_response)
1 0.005 0.005 18.294 18.294
/usr/local/lib/python3.8/site-packages/dataflow_worker/workerapiclient.py:601(<listcomp>)
5000 0.092 0.000 18.289 0.004
/usr/local/lib/python3.8/site-packages/dataflow_worker/workerapiclient.py:614(_source_bundle_to_derived_source)}}
> SDFBoundedSourceReader behaves much slower compared with the original
> behavior of BoundedSource
> -----------------------------------------------------------------------------------------------
>
> Key: BEAM-12781
> URL: https://issues.apache.org/jira/browse/BEAM-12781
> Project: Beam
> Issue Type: Bug
> Components: io-py-common
> Reporter: Pablo Estrada
> Assignee: Pablo Estrada
> Priority: P2
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)