Hi everyone,

Thanks to all of you for the discussion.
If there are no objections, I would like to start a vote thread tomorrow.

Best,
Xingbo

Xingbo Huang <hxbks...@gmail.com> 于2022年1月7日周五 16:18写道:

> Hi Till,
>
> I have written a more complicated PyFlink job. Compared with the previous
> single python udf job, there is an extra stage of converting between table
> and datastream. Besides, I added a python map function for the job. Because
> python datastream has not yet implemented Thread mode, the python map
> function operator is still running in Process Mode.
>
> ```
> source = t_env.from_path("source_table")  # schema [id: String, d:int]
>
> @udf(result_type=DataTypes.STRING(), func_type="general")
> def upper(x):
>     return x.upper()
>
> t_env.create_temporary_system_function("upper", upper)
> # python map function
> ds = t_env.to_data_stream(source) \
>                 .map(lambda x: x, output_type=Types.ROW_NAMED(["id", "d"],
>
>                            [Types.STRING(),
>
>                             Types.INT()]))
>
> t = t_env.from_data_stream(ds)
> t.select('upper(id)').execute_insert('sink_table')
> ```
>
> The input data size is 1k.
>
> Mode                                                 |   QPS
> Process Mode                                   |    3w
> Thread Mode + Process mode         |    4w
>
> From the table, we can find that the nodes run in Process Mode is the
> performance bottleneck of the job.
>
> Best,
> Xingbo
>
> Till Rohrmann <trohrm...@apache.org> 于2022年1月5日周三 23:16写道:
>
>> Thanks for the detailed answer Xingbo. Quick question on the last figure
>> in
>> the FLIP. You said that this is a real world Flink stream SQL job. The
>> title of the graph says UDF(String Upper). So do I understand correctly
>> that string upper is the real world use case you have measured? What I
>> wanted to ask is how a slightly more complex Flink Python job (involving
>> shuffles, with back pressure, etc.) performs using the thread and process
>> mode respectively.
>>
>> If the mode solely needs changes in the Python part of Flink, then I don't
>> have any concerns from the runtime perspective.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jan 5, 2022 at 1:55 PM Xingbo Huang <hxbks...@gmail.com> wrote:
>>
>> > Hi Till and Thomas,
>> >
>> > Thanks a lot for joining the discussion.
>> >
>> > For Till:
>> >
>> > >>> Is the slower performance currently the biggest pain point for our
>> > Python users? What else are our Python users mainly complaining about?
>> >
>> > PyFlink users are most concerned about two parts, one is better
>> usability,
>> > the other is performance. Users often make some benchmarks when they
>> > investigate pyflink[1][2] at the beginning to decide whether to use
>> > PyFlink. The performance of a PyFlink job depends on two parts, one is
>> the
>> > overhead of the PyFlink framework, and the other is the Python function
>> > complexity implemented by the user. In the Python ecosystem, there are
>> many
>> > libraries and tools that can help Python users improve the performance
>> of
>> > their custom functions, such as pandas[3], numba[4] and cython[5]. So we
>> > hope that the framework overhead of PyFlink itself can also be reduced.
>> >
>> > >>> Concerning the proposed changes, are there any changes required on
>> the
>> > runtime side (changes to Flink)? How will the deployment and memory
>> > management be affected when using the thread execution mode?
>> >
>> > The changes on PyFlink Runtime mentioned here are actually only
>> > modifications of PyFlink custom Operators, such as
>> > PythonScalarFunctionOperator[6], which won't affect deployment and
>> memory
>> > management.
>> >
>> > >>> One more question that came to my mind: How much performance
>> > improvement dowe gain on a real-world Python use case? Were the
>> > measurements more like micro benchmarks where the Python UDF was called
>> w/o
>> > the overhead of Flink? I would just be curious how much the Python
>> > component contributes to the overall runtime of a real world job. Do we
>> > have some data on this?
>> >
>> > The last figure I put in FLIP is the performance comparison of three
>> real
>> > Flink Stream Sql Jobs. They are a Java UDF job, a Python UDF job in
>> Process
>> > Mode, and a Python UDF job in Thread Mode. The calculated value of QPS
>> is
>> > the end-to-end Flink job execution result. As shown in the performance
>> > comparison chart, the performance of Python udf with the same function
>> can
>> > often only reach 20% of Java udf, so the performance of python udf will
>> > often become the performance bottleneck in a PyFlink job.
>> >
>> > For Thomas:
>> >
>> > The first time that I realized the framework overhead of various IPC
>> > (socket, grpc, shared memory) cannot be ignored in some scenarios is
>> due to
>> > an image algorithm prediction job of PyFlink. Its input parameters are a
>> > series of huge image binary arrays, and its data size is bigger than 1G.
>> > The performance overhead of serialization/deserialization has become an
>> > important part of its poor performance. Although this job is a bit
>> extreme,
>> > through measurement, we did find the impact of the
>> > serialization/deserialization overhead caused by larger size parameters
>> on
>> > the performance of the IPC framework.
>> >
>> > >>> As I understand it, you measured the difference in throughput for
>> UPPER
>> > between process and embedded mode and the difference is 50% increased
>> > throughput?
>> >
>> > This 50% is the result when the data size is less than 100byte. When the
>> > data size reaches 1k, the performance of the Embedded Mode will reach
>> about
>> > 3.5 times the performance of the Process Mode shown in the FLIP. When
>> the
>> > data reaches 1M, the performance of Embedded Mode can reach 5 times the
>> > performance of the Process Mode. The biggest difference here is that in
>> > Embedded Mode, input/result data does not need to be
>> > serialized/deserialized.
>> >
>> > >>> Is that a typical UDF in your usage?
>> >
>> > The reason for choosing UPPER is that a simpler udf implementation can
>> make
>> > it easier to evaluate the performance of different execution modes.
>> >
>> > >>> What do you observe when the function becomes more complex?
>> >
>> > We can analyze the QPS of the framework (process mode or embedded mode)
>> and
>> > the QPS of the UDF calculation logic separately. A more complex UDF
>> means
>> > that it is a UDF with a smaller QPS. The main factors that affect the
>> > framework QPS are data type of parameters, number of parameters and
>> size of
>> > parameters, which will greatly affect the serialization/deserialization
>> > overhead in Process Mode.
>> >
>> > The purpose of introducing thread mode is not to replace Process mode,
>> but
>> > to supplement Python udf usage scenarios such as cep and join, and some
>> > scenarios where higher performance is pursued. Compared with Thread
>> mode,
>> > Process Mode has better isolation, which can solve the limitation of
>> thread
>> > mode in some scenarios such as session mode.
>> >
>> > [1] https://www.mail-archive.com/user@flink.apache.org/msg42760.html
>> > [2] https://www.mail-archive.com/user@flink.apache.org/msg44975.html
>> > [3] https://pandas.pydata.org/
>> > [4] https://cython.org/
>> > [5] https://numba.pydata.org/
>> > [6]
>> >
>> >
>> https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
>> >
>> > Best,
>> > Xingbo
>> >
>> > Thomas Weise <t...@apache.org> 于2022年1月4日周二 04:23写道:
>> >
>> > > Interesting discussion. It caught my attention because I was also
>> > > interested in the Beam fn execution overhead a few years ago.
>> > >
>> > > We found back then that while in theory the fn protocol overhead is
>> > > very significant, for realistic function workloads that overhead was
>> > > negligible. And of course it all depends on the use case. It might be
>> > > worthwhile to quantify a couple more scenarios.
>> > >
>> > > As I understand it, you measured the difference in throughput for
>> > > UPPER between process and embedded mode and the difference is 50%
>> > > increased throughput? Is that a typical UDF in your usage? What do you
>> > > observe when the function becomes more complex?
>> > >
>> > > Thanks,
>> > > Thomas
>> > >
>> > > On Mon, Jan 3, 2022 at 5:52 AM Till Rohrmann <trohrm...@apache.org>
>> > wrote:
>> > > >
>> > > > One more question that came to my mind: How much performance
>> > improvement
>> > > do
>> > > > we gain on a real-world Python use case? Were the measurements more
>> > like
>> > > > micro benchmarks where the Python UDF was called w/o the overhead of
>> > > Flink?
>> > > > I would just be curious how much the Python component contributes to
>> > the
>> > > > overall runtime of a real world job. Do we have some data on this?
>> > > >
>> > > > Cheers,
>> > > > Till
>> > > >
>> > > > On Mon, Jan 3, 2022 at 11:45 AM Till Rohrmann <trohrm...@apache.org
>> >
>> > > wrote:
>> > > >
>> > > > > Hi Xingbo,
>> > > > >
>> > > > > Thanks for creating this FLIP. I have two general questions about
>> the
>> > > > > motivation for this FLIP because I have only very little exposure
>> to
>> > > our
>> > > > > Python users:
>> > > > >
>> > > > > Is the slower performance currently the biggest pain point for our
>> > > Python
>> > > > > users?
>> > > > >
>> > > > > What else are our Python users mainly complaining about?
>> > > > >
>> > > > > Concerning the proposed changes, are there any changes required on
>> > the
>> > > > > runtime side (changes to Flink)? How will the deployment and
>> memory
>> > > > > management be affected when using the thread execution mode?
>> > > > >
>> > > > > Cheers,
>> > > > > Till
>> > > > >
>> > > > > On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang <hxbks...@gmail.com>
>> > > wrote:
>> > > > >
>> > > > >> Hi Wei,
>> > > > >>
>> > > > >> Thanks a lot for your feedback. Very good questions!
>> > > > >>
>> > > > >> >>> 1. It seems that we dynamically load an embedded Python and
>> user
>> > > > >> dependencies in the TM process. Can they be uninstalled cleanly
>> > after
>> > > the
>> > > > >> task finished? i.e. Can we use the Thread Mode in session mode
>> and
>> > > Pyflink
>> > > > >> shell?
>> > > > >>
>> > > > >> I mentioned the limitation of this part in FLIP. There is no
>> problem
>> > > > >> without changing the python interpreter, but if you need to
>> change
>> > the
>> > > > >> python interpreter, there is really no way to reload the Python
>> > > library.
>> > > > >> The problem is mainly caused by many Python libraries having an
>> > > assumption
>> > > > >> that they own the process alone.
>> > > > >>
>> > > > >> >>> 2. Does one TM have only one embedded Python running at the
>> same
>> > > time?
>> > > > >> If all the Python operator in the TM share the same PVM, will
>> there
>> > > be a
>> > > > >> loss in performance?
>> > > > >>
>> > > > >> Your understanding is correct that one TM have only one embedded
>> > > Python
>> > > > >> running at the same time. I guess you are worried about the
>> > > performance
>> > > > >> loss of multi threads caused by Python GIL. There is a one-to-one
>> > > > >> correspondence between Java worker thread and Python
>> > subinterpreters.
>> > > > >> Although the subinterpreters has not yet completely overcome the
>> GIL
>> > > > >> sharing problem(The Python community’s recent plan for a
>> > > per-interpreter
>> > > > >> GIL is also under discussion[1]), the performance of
>> subinterpreters
>> > > is
>> > > > >> very close to that of multiprocessing [2].
>> > > > >>
>> > > > >> >>> 3. How do we load the relevant c library if the
>> > python.executable
>> > > is
>> > > > >> provided by users?
>> > > > >>
>> > > > >> Once python.executable is provided, PEMJA will dynamically load
>> the
>> > > > >> CPython
>> > > > >> library (libpython.*so or libpython.*dylib) and pemja.so
>> installed
>> > in
>> > > the
>> > > > >> python environment.
>> > > > >>
>> > > > >> >>> May there be a risk of version conflicts?
>> > > > >>
>> > > > >> I understand that this question is actually discussing whether
>> C/C++
>> > > has a
>> > > > >> way to solve the problem of relying on different versions of a
>> > > library.
>> > > > >> First of all, we know that if there is only static linking, there
>> > > will be
>> > > > >> no such problem.  And I have studied the source code of
>> CPython[3],
>> > > and
>> > > > >> there is no usage of dynamic linking. The rest is the case where
>> > > dynamic
>> > > > >> linking is used in the C library written by the users. There are
>> > many
>> > > ways
>> > > > >> to solve this problem with dynamic linking, but after all, this
>> > > library is
>> > > > >> written by users, and it is difficult for us to guarantee that
>> there
>> > > will
>> > > > >> be no conflicts. At this time, Process Mode will be the choice of
>> > falk
>> > > > >> back.
>> > > > >>
>> > > > >> [1]
>> > > > >>
>> > > > >>
>> > >
>> >
>> https://mail.python.org/archives/list/python-...@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV
>> > > > >> [2]
>> > > > >>
>> > > > >>
>> > >
>> >
>> https://mail.python.org/archives/list/python-...@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ
>> > > > >> [3] https://github.com/python/cpython
>> > > > >>
>> > > > >> Best,
>> > > > >> Xingbo
>> > > > >>
>> > > > >> Wei Zhong <weizhong0...@gmail.com> 于2021年12月31日周五 11:49写道:
>> > > > >>
>> > > > >> > Hi Xingbo,
>> > > > >> >
>> > > > >> > Thanks for creating this FLIP. Big +1 for it!
>> > > > >> >
>> > > > >> > I have some question about the Thread Mode:
>> > > > >> >
>> > > > >> > 1. It seems that we dynamically load an embedded Python and
>> user
>> > > > >> > dependencies in the TM process. Can they be uninstalled cleanly
>> > > after
>> > > > >> the
>> > > > >> > task finished? i.e. Can we use the Thread Mode in session mode
>> and
>> > > > >> Pyflink
>> > > > >> > shell?
>> > > > >> >
>> > > > >> > 2. Does one TM have only one embedded Python running at the
>> same
>> > > time?
>> > > > >> If
>> > > > >> > all the Python operator in the TM share the same PVM, will
>> there
>> > be
>> > > a
>> > > > >> loss
>> > > > >> > in performance?
>> > > > >> >
>> > > > >> > 3. How do we load the relevant c library if the
>> python.executable
>> > is
>> > > > >> > provided by users? May there be a risk of version conflicts?
>> > > > >> >
>> > > > >> > Best,
>> > > > >> > Wei
>> > > > >> >
>> > > > >> >
>> > > > >> > > 2021年12月29日 上午11:56,Xingbo Huang <hxbks...@gmail.com> 写道:
>> > > > >> > >
>> > > > >> > > Hi everyone,
>> > > > >> > >
>> > > > >> > > I would like to start a discussion thread on "Support PyFlink
>> > > Runtime
>> > > > >> > > Execution in Thread Mode"
>> > > > >> > >
>> > > > >> > > We have provided PyFlink Runtime framework to support Python
>> > > > >> user-defined
>> > > > >> > > functions since Flink 1.10. The PyFlink Runtime framework is
>> > > called
>> > > > >> > Process
>> > > > >> > > Mode, which depends on an inter-process communication
>> > architecture
>> > > > >> based
>> > > > >> > on
>> > > > >> > > the Apache Beam Portability framework. Although starting a
>> > > dedicated
>> > > > >> > > process to execute Python user-defined functions could have
>> > better
>> > > > >> > resource
>> > > > >> > > isolation, it will bring greater resource and performance
>> > > overhead.
>> > > > >> > >
>> > > > >> > > In order to overcome the resource and performance problems on
>> > > Process
>> > > > >> > Mode,
>> > > > >> > > we will propose a new execution mode which executes Python
>> > > > >> user-defined
>> > > > >> > > functions in the same thread instead of a separate process.
>> > > > >> > >
>> > > > >> > > I have drafted the FLIP-206[1]. Please feel free to reply to
>> > this
>> > > > >> email
>> > > > >> > > thread. Looking forward to your feedback!
>> > > > >> > >
>> > > > >> > > Best,
>> > > > >> > > Xingbo
>> > > > >> > >
>> > > > >> > > [1]
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode
>> > > > >> >
>> > > > >> >
>> > > > >>
>> > > > >
>> > >
>> >
>>
>

Reply via email to