Hi everyone, Thanks to all of you for the discussion. If there are no objections, I would like to start a vote thread tomorrow.
Best, Xingbo Xingbo Huang <hxbks...@gmail.com> 于2022年1月7日周五 16:18写道: > Hi Till, > > I have written a more complicated PyFlink job. Compared with the previous > single python udf job, there is an extra stage of converting between table > and datastream. Besides, I added a python map function for the job. Because > python datastream has not yet implemented Thread mode, the python map > function operator is still running in Process Mode. > > ``` > source = t_env.from_path("source_table") # schema [id: String, d:int] > > @udf(result_type=DataTypes.STRING(), func_type="general") > def upper(x): > return x.upper() > > t_env.create_temporary_system_function("upper", upper) > # python map function > ds = t_env.to_data_stream(source) \ > .map(lambda x: x, output_type=Types.ROW_NAMED(["id", "d"], > > [Types.STRING(), > > Types.INT()])) > > t = t_env.from_data_stream(ds) > t.select('upper(id)').execute_insert('sink_table') > ``` > > The input data size is 1k. > > Mode | QPS > Process Mode | 3w > Thread Mode + Process mode | 4w > > From the table, we can find that the nodes run in Process Mode is the > performance bottleneck of the job. > > Best, > Xingbo > > Till Rohrmann <trohrm...@apache.org> 于2022年1月5日周三 23:16写道: > >> Thanks for the detailed answer Xingbo. Quick question on the last figure >> in >> the FLIP. You said that this is a real world Flink stream SQL job. The >> title of the graph says UDF(String Upper). So do I understand correctly >> that string upper is the real world use case you have measured? What I >> wanted to ask is how a slightly more complex Flink Python job (involving >> shuffles, with back pressure, etc.) performs using the thread and process >> mode respectively. >> >> If the mode solely needs changes in the Python part of Flink, then I don't >> have any concerns from the runtime perspective. >> >> Cheers, >> Till >> >> On Wed, Jan 5, 2022 at 1:55 PM Xingbo Huang <hxbks...@gmail.com> wrote: >> >> > Hi Till and Thomas, >> > >> > Thanks a lot for joining the discussion. >> > >> > For Till: >> > >> > >>> Is the slower performance currently the biggest pain point for our >> > Python users? What else are our Python users mainly complaining about? >> > >> > PyFlink users are most concerned about two parts, one is better >> usability, >> > the other is performance. Users often make some benchmarks when they >> > investigate pyflink[1][2] at the beginning to decide whether to use >> > PyFlink. The performance of a PyFlink job depends on two parts, one is >> the >> > overhead of the PyFlink framework, and the other is the Python function >> > complexity implemented by the user. In the Python ecosystem, there are >> many >> > libraries and tools that can help Python users improve the performance >> of >> > their custom functions, such as pandas[3], numba[4] and cython[5]. So we >> > hope that the framework overhead of PyFlink itself can also be reduced. >> > >> > >>> Concerning the proposed changes, are there any changes required on >> the >> > runtime side (changes to Flink)? How will the deployment and memory >> > management be affected when using the thread execution mode? >> > >> > The changes on PyFlink Runtime mentioned here are actually only >> > modifications of PyFlink custom Operators, such as >> > PythonScalarFunctionOperator[6], which won't affect deployment and >> memory >> > management. >> > >> > >>> One more question that came to my mind: How much performance >> > improvement dowe gain on a real-world Python use case? Were the >> > measurements more like micro benchmarks where the Python UDF was called >> w/o >> > the overhead of Flink? I would just be curious how much the Python >> > component contributes to the overall runtime of a real world job. Do we >> > have some data on this? >> > >> > The last figure I put in FLIP is the performance comparison of three >> real >> > Flink Stream Sql Jobs. They are a Java UDF job, a Python UDF job in >> Process >> > Mode, and a Python UDF job in Thread Mode. The calculated value of QPS >> is >> > the end-to-end Flink job execution result. As shown in the performance >> > comparison chart, the performance of Python udf with the same function >> can >> > often only reach 20% of Java udf, so the performance of python udf will >> > often become the performance bottleneck in a PyFlink job. >> > >> > For Thomas: >> > >> > The first time that I realized the framework overhead of various IPC >> > (socket, grpc, shared memory) cannot be ignored in some scenarios is >> due to >> > an image algorithm prediction job of PyFlink. Its input parameters are a >> > series of huge image binary arrays, and its data size is bigger than 1G. >> > The performance overhead of serialization/deserialization has become an >> > important part of its poor performance. Although this job is a bit >> extreme, >> > through measurement, we did find the impact of the >> > serialization/deserialization overhead caused by larger size parameters >> on >> > the performance of the IPC framework. >> > >> > >>> As I understand it, you measured the difference in throughput for >> UPPER >> > between process and embedded mode and the difference is 50% increased >> > throughput? >> > >> > This 50% is the result when the data size is less than 100byte. When the >> > data size reaches 1k, the performance of the Embedded Mode will reach >> about >> > 3.5 times the performance of the Process Mode shown in the FLIP. When >> the >> > data reaches 1M, the performance of Embedded Mode can reach 5 times the >> > performance of the Process Mode. The biggest difference here is that in >> > Embedded Mode, input/result data does not need to be >> > serialized/deserialized. >> > >> > >>> Is that a typical UDF in your usage? >> > >> > The reason for choosing UPPER is that a simpler udf implementation can >> make >> > it easier to evaluate the performance of different execution modes. >> > >> > >>> What do you observe when the function becomes more complex? >> > >> > We can analyze the QPS of the framework (process mode or embedded mode) >> and >> > the QPS of the UDF calculation logic separately. A more complex UDF >> means >> > that it is a UDF with a smaller QPS. The main factors that affect the >> > framework QPS are data type of parameters, number of parameters and >> size of >> > parameters, which will greatly affect the serialization/deserialization >> > overhead in Process Mode. >> > >> > The purpose of introducing thread mode is not to replace Process mode, >> but >> > to supplement Python udf usage scenarios such as cep and join, and some >> > scenarios where higher performance is pursued. Compared with Thread >> mode, >> > Process Mode has better isolation, which can solve the limitation of >> thread >> > mode in some scenarios such as session mode. >> > >> > [1] https://www.mail-archive.com/user@flink.apache.org/msg42760.html >> > [2] https://www.mail-archive.com/user@flink.apache.org/msg44975.html >> > [3] https://pandas.pydata.org/ >> > [4] https://cython.org/ >> > [5] https://numba.pydata.org/ >> > [6] >> > >> > >> https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java >> > >> > Best, >> > Xingbo >> > >> > Thomas Weise <t...@apache.org> 于2022年1月4日周二 04:23写道: >> > >> > > Interesting discussion. It caught my attention because I was also >> > > interested in the Beam fn execution overhead a few years ago. >> > > >> > > We found back then that while in theory the fn protocol overhead is >> > > very significant, for realistic function workloads that overhead was >> > > negligible. And of course it all depends on the use case. It might be >> > > worthwhile to quantify a couple more scenarios. >> > > >> > > As I understand it, you measured the difference in throughput for >> > > UPPER between process and embedded mode and the difference is 50% >> > > increased throughput? Is that a typical UDF in your usage? What do you >> > > observe when the function becomes more complex? >> > > >> > > Thanks, >> > > Thomas >> > > >> > > On Mon, Jan 3, 2022 at 5:52 AM Till Rohrmann <trohrm...@apache.org> >> > wrote: >> > > > >> > > > One more question that came to my mind: How much performance >> > improvement >> > > do >> > > > we gain on a real-world Python use case? Were the measurements more >> > like >> > > > micro benchmarks where the Python UDF was called w/o the overhead of >> > > Flink? >> > > > I would just be curious how much the Python component contributes to >> > the >> > > > overall runtime of a real world job. Do we have some data on this? >> > > > >> > > > Cheers, >> > > > Till >> > > > >> > > > On Mon, Jan 3, 2022 at 11:45 AM Till Rohrmann <trohrm...@apache.org >> > >> > > wrote: >> > > > >> > > > > Hi Xingbo, >> > > > > >> > > > > Thanks for creating this FLIP. I have two general questions about >> the >> > > > > motivation for this FLIP because I have only very little exposure >> to >> > > our >> > > > > Python users: >> > > > > >> > > > > Is the slower performance currently the biggest pain point for our >> > > Python >> > > > > users? >> > > > > >> > > > > What else are our Python users mainly complaining about? >> > > > > >> > > > > Concerning the proposed changes, are there any changes required on >> > the >> > > > > runtime side (changes to Flink)? How will the deployment and >> memory >> > > > > management be affected when using the thread execution mode? >> > > > > >> > > > > Cheers, >> > > > > Till >> > > > > >> > > > > On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang <hxbks...@gmail.com> >> > > wrote: >> > > > > >> > > > >> Hi Wei, >> > > > >> >> > > > >> Thanks a lot for your feedback. Very good questions! >> > > > >> >> > > > >> >>> 1. It seems that we dynamically load an embedded Python and >> user >> > > > >> dependencies in the TM process. Can they be uninstalled cleanly >> > after >> > > the >> > > > >> task finished? i.e. Can we use the Thread Mode in session mode >> and >> > > Pyflink >> > > > >> shell? >> > > > >> >> > > > >> I mentioned the limitation of this part in FLIP. There is no >> problem >> > > > >> without changing the python interpreter, but if you need to >> change >> > the >> > > > >> python interpreter, there is really no way to reload the Python >> > > library. >> > > > >> The problem is mainly caused by many Python libraries having an >> > > assumption >> > > > >> that they own the process alone. >> > > > >> >> > > > >> >>> 2. Does one TM have only one embedded Python running at the >> same >> > > time? >> > > > >> If all the Python operator in the TM share the same PVM, will >> there >> > > be a >> > > > >> loss in performance? >> > > > >> >> > > > >> Your understanding is correct that one TM have only one embedded >> > > Python >> > > > >> running at the same time. I guess you are worried about the >> > > performance >> > > > >> loss of multi threads caused by Python GIL. There is a one-to-one >> > > > >> correspondence between Java worker thread and Python >> > subinterpreters. >> > > > >> Although the subinterpreters has not yet completely overcome the >> GIL >> > > > >> sharing problem(The Python community’s recent plan for a >> > > per-interpreter >> > > > >> GIL is also under discussion[1]), the performance of >> subinterpreters >> > > is >> > > > >> very close to that of multiprocessing [2]. >> > > > >> >> > > > >> >>> 3. How do we load the relevant c library if the >> > python.executable >> > > is >> > > > >> provided by users? >> > > > >> >> > > > >> Once python.executable is provided, PEMJA will dynamically load >> the >> > > > >> CPython >> > > > >> library (libpython.*so or libpython.*dylib) and pemja.so >> installed >> > in >> > > the >> > > > >> python environment. >> > > > >> >> > > > >> >>> May there be a risk of version conflicts? >> > > > >> >> > > > >> I understand that this question is actually discussing whether >> C/C++ >> > > has a >> > > > >> way to solve the problem of relying on different versions of a >> > > library. >> > > > >> First of all, we know that if there is only static linking, there >> > > will be >> > > > >> no such problem. And I have studied the source code of >> CPython[3], >> > > and >> > > > >> there is no usage of dynamic linking. The rest is the case where >> > > dynamic >> > > > >> linking is used in the C library written by the users. There are >> > many >> > > ways >> > > > >> to solve this problem with dynamic linking, but after all, this >> > > library is >> > > > >> written by users, and it is difficult for us to guarantee that >> there >> > > will >> > > > >> be no conflicts. At this time, Process Mode will be the choice of >> > falk >> > > > >> back. >> > > > >> >> > > > >> [1] >> > > > >> >> > > > >> >> > > >> > >> https://mail.python.org/archives/list/python-...@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV >> > > > >> [2] >> > > > >> >> > > > >> >> > > >> > >> https://mail.python.org/archives/list/python-...@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ >> > > > >> [3] https://github.com/python/cpython >> > > > >> >> > > > >> Best, >> > > > >> Xingbo >> > > > >> >> > > > >> Wei Zhong <weizhong0...@gmail.com> 于2021年12月31日周五 11:49写道: >> > > > >> >> > > > >> > Hi Xingbo, >> > > > >> > >> > > > >> > Thanks for creating this FLIP. Big +1 for it! >> > > > >> > >> > > > >> > I have some question about the Thread Mode: >> > > > >> > >> > > > >> > 1. It seems that we dynamically load an embedded Python and >> user >> > > > >> > dependencies in the TM process. Can they be uninstalled cleanly >> > > after >> > > > >> the >> > > > >> > task finished? i.e. Can we use the Thread Mode in session mode >> and >> > > > >> Pyflink >> > > > >> > shell? >> > > > >> > >> > > > >> > 2. Does one TM have only one embedded Python running at the >> same >> > > time? >> > > > >> If >> > > > >> > all the Python operator in the TM share the same PVM, will >> there >> > be >> > > a >> > > > >> loss >> > > > >> > in performance? >> > > > >> > >> > > > >> > 3. How do we load the relevant c library if the >> python.executable >> > is >> > > > >> > provided by users? May there be a risk of version conflicts? >> > > > >> > >> > > > >> > Best, >> > > > >> > Wei >> > > > >> > >> > > > >> > >> > > > >> > > 2021年12月29日 上午11:56,Xingbo Huang <hxbks...@gmail.com> 写道: >> > > > >> > > >> > > > >> > > Hi everyone, >> > > > >> > > >> > > > >> > > I would like to start a discussion thread on "Support PyFlink >> > > Runtime >> > > > >> > > Execution in Thread Mode" >> > > > >> > > >> > > > >> > > We have provided PyFlink Runtime framework to support Python >> > > > >> user-defined >> > > > >> > > functions since Flink 1.10. The PyFlink Runtime framework is >> > > called >> > > > >> > Process >> > > > >> > > Mode, which depends on an inter-process communication >> > architecture >> > > > >> based >> > > > >> > on >> > > > >> > > the Apache Beam Portability framework. Although starting a >> > > dedicated >> > > > >> > > process to execute Python user-defined functions could have >> > better >> > > > >> > resource >> > > > >> > > isolation, it will bring greater resource and performance >> > > overhead. >> > > > >> > > >> > > > >> > > In order to overcome the resource and performance problems on >> > > Process >> > > > >> > Mode, >> > > > >> > > we will propose a new execution mode which executes Python >> > > > >> user-defined >> > > > >> > > functions in the same thread instead of a separate process. >> > > > >> > > >> > > > >> > > I have drafted the FLIP-206[1]. Please feel free to reply to >> > this >> > > > >> email >> > > > >> > > thread. Looking forward to your feedback! >> > > > >> > > >> > > > >> > > Best, >> > > > >> > > Xingbo >> > > > >> > > >> > > > >> > > [1] >> > > > >> > > >> > > > >> > >> > > > >> >> > > >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode >> > > > >> > >> > > > >> > >> > > > >> >> > > > > >> > > >> > >> >