Hi Thomas,

Thanks for the confirmation. I will now start a vote.

Best,
Xingbo

Thomas Weise <t...@apache.org> 于2022年1月12日周三 02:20写道:

> Hi Xingbo,
>
> +1 from my side
>
> Thanks for the clarification. For your use case the parameter size and
> therefore serialization overhead was the limiting factor. I have seen
> use cases where that is not the concern, because the Python logic
> itself is heavy and dwarfs the protocol overhead (for example when
> interacting with external systems from the UDF). Hence it is good to
> give users options to optimize for their application requirements.
>
> Cheers,
> Thomas
>
> On Tue, Jan 11, 2022 at 3:44 AM Xingbo Huang <hxbks...@gmail.com> wrote:
> >
> > Hi everyone,
> >
> > Thanks to all of you for the discussion.
> > If there are no objections, I would like to start a vote thread tomorrow.
> >
> > Best,
> > Xingbo
> >
> > Xingbo Huang <hxbks...@gmail.com> 于2022年1月7日周五 16:18写道:
> >
> > > Hi Till,
> > >
> > > I have written a more complicated PyFlink job. Compared with the
> previous
> > > single python udf job, there is an extra stage of converting between
> table
> > > and datastream. Besides, I added a python map function for the job.
> Because
> > > python datastream has not yet implemented Thread mode, the python map
> > > function operator is still running in Process Mode.
> > >
> > > ```
> > > source = t_env.from_path("source_table")  # schema [id: String, d:int]
> > >
> > > @udf(result_type=DataTypes.STRING(), func_type="general")
> > > def upper(x):
> > >     return x.upper()
> > >
> > > t_env.create_temporary_system_function("upper", upper)
> > > # python map function
> > > ds = t_env.to_data_stream(source) \
> > >                 .map(lambda x: x, output_type=Types.ROW_NAMED(["id",
> "d"],
> > >
> > >                            [Types.STRING(),
> > >
> > >                             Types.INT()]))
> > >
> > > t = t_env.from_data_stream(ds)
> > > t.select('upper(id)').execute_insert('sink_table')
> > > ```
> > >
> > > The input data size is 1k.
> > >
> > > Mode                                                 |   QPS
> > > Process Mode                                   |    3w
> > > Thread Mode + Process mode         |    4w
> > >
> > > From the table, we can find that the nodes run in Process Mode is the
> > > performance bottleneck of the job.
> > >
> > > Best,
> > > Xingbo
> > >
> > > Till Rohrmann <trohrm...@apache.org> 于2022年1月5日周三 23:16写道:
> > >
> > >> Thanks for the detailed answer Xingbo. Quick question on the last
> figure
> > >> in
> > >> the FLIP. You said that this is a real world Flink stream SQL job. The
> > >> title of the graph says UDF(String Upper). So do I understand
> correctly
> > >> that string upper is the real world use case you have measured? What I
> > >> wanted to ask is how a slightly more complex Flink Python job
> (involving
> > >> shuffles, with back pressure, etc.) performs using the thread and
> process
> > >> mode respectively.
> > >>
> > >> If the mode solely needs changes in the Python part of Flink, then I
> don't
> > >> have any concerns from the runtime perspective.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Wed, Jan 5, 2022 at 1:55 PM Xingbo Huang <hxbks...@gmail.com>
> wrote:
> > >>
> > >> > Hi Till and Thomas,
> > >> >
> > >> > Thanks a lot for joining the discussion.
> > >> >
> > >> > For Till:
> > >> >
> > >> > >>> Is the slower performance currently the biggest pain point for
> our
> > >> > Python users? What else are our Python users mainly complaining
> about?
> > >> >
> > >> > PyFlink users are most concerned about two parts, one is better
> > >> usability,
> > >> > the other is performance. Users often make some benchmarks when they
> > >> > investigate pyflink[1][2] at the beginning to decide whether to use
> > >> > PyFlink. The performance of a PyFlink job depends on two parts, one
> is
> > >> the
> > >> > overhead of the PyFlink framework, and the other is the Python
> function
> > >> > complexity implemented by the user. In the Python ecosystem, there
> are
> > >> many
> > >> > libraries and tools that can help Python users improve the
> performance
> > >> of
> > >> > their custom functions, such as pandas[3], numba[4] and cython[5].
> So we
> > >> > hope that the framework overhead of PyFlink itself can also be
> reduced.
> > >> >
> > >> > >>> Concerning the proposed changes, are there any changes required
> on
> > >> the
> > >> > runtime side (changes to Flink)? How will the deployment and memory
> > >> > management be affected when using the thread execution mode?
> > >> >
> > >> > The changes on PyFlink Runtime mentioned here are actually only
> > >> > modifications of PyFlink custom Operators, such as
> > >> > PythonScalarFunctionOperator[6], which won't affect deployment and
> > >> memory
> > >> > management.
> > >> >
> > >> > >>> One more question that came to my mind: How much performance
> > >> > improvement dowe gain on a real-world Python use case? Were the
> > >> > measurements more like micro benchmarks where the Python UDF was
> called
> > >> w/o
> > >> > the overhead of Flink? I would just be curious how much the Python
> > >> > component contributes to the overall runtime of a real world job.
> Do we
> > >> > have some data on this?
> > >> >
> > >> > The last figure I put in FLIP is the performance comparison of three
> > >> real
> > >> > Flink Stream Sql Jobs. They are a Java UDF job, a Python UDF job in
> > >> Process
> > >> > Mode, and a Python UDF job in Thread Mode. The calculated value of
> QPS
> > >> is
> > >> > the end-to-end Flink job execution result. As shown in the
> performance
> > >> > comparison chart, the performance of Python udf with the same
> function
> > >> can
> > >> > often only reach 20% of Java udf, so the performance of python udf
> will
> > >> > often become the performance bottleneck in a PyFlink job.
> > >> >
> > >> > For Thomas:
> > >> >
> > >> > The first time that I realized the framework overhead of various IPC
> > >> > (socket, grpc, shared memory) cannot be ignored in some scenarios is
> > >> due to
> > >> > an image algorithm prediction job of PyFlink. Its input parameters
> are a
> > >> > series of huge image binary arrays, and its data size is bigger
> than 1G.
> > >> > The performance overhead of serialization/deserialization has
> become an
> > >> > important part of its poor performance. Although this job is a bit
> > >> extreme,
> > >> > through measurement, we did find the impact of the
> > >> > serialization/deserialization overhead caused by larger size
> parameters
> > >> on
> > >> > the performance of the IPC framework.
> > >> >
> > >> > >>> As I understand it, you measured the difference in throughput
> for
> > >> UPPER
> > >> > between process and embedded mode and the difference is 50%
> increased
> > >> > throughput?
> > >> >
> > >> > This 50% is the result when the data size is less than 100byte.
> When the
> > >> > data size reaches 1k, the performance of the Embedded Mode will
> reach
> > >> about
> > >> > 3.5 times the performance of the Process Mode shown in the FLIP.
> When
> > >> the
> > >> > data reaches 1M, the performance of Embedded Mode can reach 5 times
> the
> > >> > performance of the Process Mode. The biggest difference here is
> that in
> > >> > Embedded Mode, input/result data does not need to be
> > >> > serialized/deserialized.
> > >> >
> > >> > >>> Is that a typical UDF in your usage?
> > >> >
> > >> > The reason for choosing UPPER is that a simpler udf implementation
> can
> > >> make
> > >> > it easier to evaluate the performance of different execution modes.
> > >> >
> > >> > >>> What do you observe when the function becomes more complex?
> > >> >
> > >> > We can analyze the QPS of the framework (process mode or embedded
> mode)
> > >> and
> > >> > the QPS of the UDF calculation logic separately. A more complex UDF
> > >> means
> > >> > that it is a UDF with a smaller QPS. The main factors that affect
> the
> > >> > framework QPS are data type of parameters, number of parameters and
> > >> size of
> > >> > parameters, which will greatly affect the
> serialization/deserialization
> > >> > overhead in Process Mode.
> > >> >
> > >> > The purpose of introducing thread mode is not to replace Process
> mode,
> > >> but
> > >> > to supplement Python udf usage scenarios such as cep and join, and
> some
> > >> > scenarios where higher performance is pursued. Compared with Thread
> > >> mode,
> > >> > Process Mode has better isolation, which can solve the limitation of
> > >> thread
> > >> > mode in some scenarios such as session mode.
> > >> >
> > >> > [1]
> https://www.mail-archive.com/user@flink.apache.org/msg42760.html
> > >> > [2]
> https://www.mail-archive.com/user@flink.apache.org/msg44975.html
> > >> > [3] https://pandas.pydata.org/
> > >> > [4] https://cython.org/
> > >> > [5] https://numba.pydata.org/
> > >> > [6]
> > >> >
> > >> >
> > >>
> https://github.com/apache/flink/blob/master/flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/scalar/PythonScalarFunctionOperator.java
> > >> >
> > >> > Best,
> > >> > Xingbo
> > >> >
> > >> > Thomas Weise <t...@apache.org> 于2022年1月4日周二 04:23写道:
> > >> >
> > >> > > Interesting discussion. It caught my attention because I was also
> > >> > > interested in the Beam fn execution overhead a few years ago.
> > >> > >
> > >> > > We found back then that while in theory the fn protocol overhead
> is
> > >> > > very significant, for realistic function workloads that overhead
> was
> > >> > > negligible. And of course it all depends on the use case. It
> might be
> > >> > > worthwhile to quantify a couple more scenarios.
> > >> > >
> > >> > > As I understand it, you measured the difference in throughput for
> > >> > > UPPER between process and embedded mode and the difference is 50%
> > >> > > increased throughput? Is that a typical UDF in your usage? What
> do you
> > >> > > observe when the function becomes more complex?
> > >> > >
> > >> > > Thanks,
> > >> > > Thomas
> > >> > >
> > >> > > On Mon, Jan 3, 2022 at 5:52 AM Till Rohrmann <
> trohrm...@apache.org>
> > >> > wrote:
> > >> > > >
> > >> > > > One more question that came to my mind: How much performance
> > >> > improvement
> > >> > > do
> > >> > > > we gain on a real-world Python use case? Were the measurements
> more
> > >> > like
> > >> > > > micro benchmarks where the Python UDF was called w/o the
> overhead of
> > >> > > Flink?
> > >> > > > I would just be curious how much the Python component
> contributes to
> > >> > the
> > >> > > > overall runtime of a real world job. Do we have some data on
> this?
> > >> > > >
> > >> > > > Cheers,
> > >> > > > Till
> > >> > > >
> > >> > > > On Mon, Jan 3, 2022 at 11:45 AM Till Rohrmann <
> trohrm...@apache.org
> > >> >
> > >> > > wrote:
> > >> > > >
> > >> > > > > Hi Xingbo,
> > >> > > > >
> > >> > > > > Thanks for creating this FLIP. I have two general questions
> about
> > >> the
> > >> > > > > motivation for this FLIP because I have only very little
> exposure
> > >> to
> > >> > > our
> > >> > > > > Python users:
> > >> > > > >
> > >> > > > > Is the slower performance currently the biggest pain point
> for our
> > >> > > Python
> > >> > > > > users?
> > >> > > > >
> > >> > > > > What else are our Python users mainly complaining about?
> > >> > > > >
> > >> > > > > Concerning the proposed changes, are there any changes
> required on
> > >> > the
> > >> > > > > runtime side (changes to Flink)? How will the deployment and
> > >> memory
> > >> > > > > management be affected when using the thread execution mode?
> > >> > > > >
> > >> > > > > Cheers,
> > >> > > > > Till
> > >> > > > >
> > >> > > > > On Fri, Dec 31, 2021 at 9:46 AM Xingbo Huang <
> hxbks...@gmail.com>
> > >> > > wrote:
> > >> > > > >
> > >> > > > >> Hi Wei,
> > >> > > > >>
> > >> > > > >> Thanks a lot for your feedback. Very good questions!
> > >> > > > >>
> > >> > > > >> >>> 1. It seems that we dynamically load an embedded Python
> and
> > >> user
> > >> > > > >> dependencies in the TM process. Can they be uninstalled
> cleanly
> > >> > after
> > >> > > the
> > >> > > > >> task finished? i.e. Can we use the Thread Mode in session
> mode
> > >> and
> > >> > > Pyflink
> > >> > > > >> shell?
> > >> > > > >>
> > >> > > > >> I mentioned the limitation of this part in FLIP. There is no
> > >> problem
> > >> > > > >> without changing the python interpreter, but if you need to
> > >> change
> > >> > the
> > >> > > > >> python interpreter, there is really no way to reload the
> Python
> > >> > > library.
> > >> > > > >> The problem is mainly caused by many Python libraries having
> an
> > >> > > assumption
> > >> > > > >> that they own the process alone.
> > >> > > > >>
> > >> > > > >> >>> 2. Does one TM have only one embedded Python running at
> the
> > >> same
> > >> > > time?
> > >> > > > >> If all the Python operator in the TM share the same PVM, will
> > >> there
> > >> > > be a
> > >> > > > >> loss in performance?
> > >> > > > >>
> > >> > > > >> Your understanding is correct that one TM have only one
> embedded
> > >> > > Python
> > >> > > > >> running at the same time. I guess you are worried about the
> > >> > > performance
> > >> > > > >> loss of multi threads caused by Python GIL. There is a
> one-to-one
> > >> > > > >> correspondence between Java worker thread and Python
> > >> > subinterpreters.
> > >> > > > >> Although the subinterpreters has not yet completely overcome
> the
> > >> GIL
> > >> > > > >> sharing problem(The Python community’s recent plan for a
> > >> > > per-interpreter
> > >> > > > >> GIL is also under discussion[1]), the performance of
> > >> subinterpreters
> > >> > > is
> > >> > > > >> very close to that of multiprocessing [2].
> > >> > > > >>
> > >> > > > >> >>> 3. How do we load the relevant c library if the
> > >> > python.executable
> > >> > > is
> > >> > > > >> provided by users?
> > >> > > > >>
> > >> > > > >> Once python.executable is provided, PEMJA will dynamically
> load
> > >> the
> > >> > > > >> CPython
> > >> > > > >> library (libpython.*so or libpython.*dylib) and pemja.so
> > >> installed
> > >> > in
> > >> > > the
> > >> > > > >> python environment.
> > >> > > > >>
> > >> > > > >> >>> May there be a risk of version conflicts?
> > >> > > > >>
> > >> > > > >> I understand that this question is actually discussing
> whether
> > >> C/C++
> > >> > > has a
> > >> > > > >> way to solve the problem of relying on different versions of
> a
> > >> > > library.
> > >> > > > >> First of all, we know that if there is only static linking,
> there
> > >> > > will be
> > >> > > > >> no such problem.  And I have studied the source code of
> > >> CPython[3],
> > >> > > and
> > >> > > > >> there is no usage of dynamic linking. The rest is the case
> where
> > >> > > dynamic
> > >> > > > >> linking is used in the C library written by the users. There
> are
> > >> > many
> > >> > > ways
> > >> > > > >> to solve this problem with dynamic linking, but after all,
> this
> > >> > > library is
> > >> > > > >> written by users, and it is difficult for us to guarantee
> that
> > >> there
> > >> > > will
> > >> > > > >> be no conflicts. At this time, Process Mode will be the
> choice of
> > >> > falk
> > >> > > > >> back.
> > >> > > > >>
> > >> > > > >> [1]
> > >> > > > >>
> > >> > > > >>
> > >> > >
> > >> >
> > >>
> https://mail.python.org/archives/list/python-...@python.org/thread/S5GZZCEREZLA2PEMTVFBCDM52H4JSENR/#RIK75U3ROEHWZL4VENQSQECB4F4GDELV
> > >> > > > >> [2]
> > >> > > > >>
> > >> > > > >>
> > >> > >
> > >> >
> > >>
> https://mail.python.org/archives/list/python-...@python.org/thread/PNLBJBNIQDMG2YYGPBCTGOKOAVXRBJWY/#L5OXHXPFONRKLR3W6U46LUSUIBN4FCZQ
> > >> > > > >> [3] https://github.com/python/cpython
> > >> > > > >>
> > >> > > > >> Best,
> > >> > > > >> Xingbo
> > >> > > > >>
> > >> > > > >> Wei Zhong <weizhong0...@gmail.com> 于2021年12月31日周五 11:49写道:
> > >> > > > >>
> > >> > > > >> > Hi Xingbo,
> > >> > > > >> >
> > >> > > > >> > Thanks for creating this FLIP. Big +1 for it!
> > >> > > > >> >
> > >> > > > >> > I have some question about the Thread Mode:
> > >> > > > >> >
> > >> > > > >> > 1. It seems that we dynamically load an embedded Python and
> > >> user
> > >> > > > >> > dependencies in the TM process. Can they be uninstalled
> cleanly
> > >> > > after
> > >> > > > >> the
> > >> > > > >> > task finished? i.e. Can we use the Thread Mode in session
> mode
> > >> and
> > >> > > > >> Pyflink
> > >> > > > >> > shell?
> > >> > > > >> >
> > >> > > > >> > 2. Does one TM have only one embedded Python running at the
> > >> same
> > >> > > time?
> > >> > > > >> If
> > >> > > > >> > all the Python operator in the TM share the same PVM, will
> > >> there
> > >> > be
> > >> > > a
> > >> > > > >> loss
> > >> > > > >> > in performance?
> > >> > > > >> >
> > >> > > > >> > 3. How do we load the relevant c library if the
> > >> python.executable
> > >> > is
> > >> > > > >> > provided by users? May there be a risk of version
> conflicts?
> > >> > > > >> >
> > >> > > > >> > Best,
> > >> > > > >> > Wei
> > >> > > > >> >
> > >> > > > >> >
> > >> > > > >> > > 2021年12月29日 上午11:56,Xingbo Huang <hxbks...@gmail.com>
> 写道:
> > >> > > > >> > >
> > >> > > > >> > > Hi everyone,
> > >> > > > >> > >
> > >> > > > >> > > I would like to start a discussion thread on "Support
> PyFlink
> > >> > > Runtime
> > >> > > > >> > > Execution in Thread Mode"
> > >> > > > >> > >
> > >> > > > >> > > We have provided PyFlink Runtime framework to support
> Python
> > >> > > > >> user-defined
> > >> > > > >> > > functions since Flink 1.10. The PyFlink Runtime
> framework is
> > >> > > called
> > >> > > > >> > Process
> > >> > > > >> > > Mode, which depends on an inter-process communication
> > >> > architecture
> > >> > > > >> based
> > >> > > > >> > on
> > >> > > > >> > > the Apache Beam Portability framework. Although starting
> a
> > >> > > dedicated
> > >> > > > >> > > process to execute Python user-defined functions could
> have
> > >> > better
> > >> > > > >> > resource
> > >> > > > >> > > isolation, it will bring greater resource and performance
> > >> > > overhead.
> > >> > > > >> > >
> > >> > > > >> > > In order to overcome the resource and performance
> problems on
> > >> > > Process
> > >> > > > >> > Mode,
> > >> > > > >> > > we will propose a new execution mode which executes
> Python
> > >> > > > >> user-defined
> > >> > > > >> > > functions in the same thread instead of a separate
> process.
> > >> > > > >> > >
> > >> > > > >> > > I have drafted the FLIP-206[1]. Please feel free to
> reply to
> > >> > this
> > >> > > > >> email
> > >> > > > >> > > thread. Looking forward to your feedback!
> > >> > > > >> > >
> > >> > > > >> > > Best,
> > >> > > > >> > > Xingbo
> > >> > > > >> > >
> > >> > > > >> > > [1]
> > >> > > > >> > >
> > >> > > > >> >
> > >> > > > >>
> > >> > >
> > >> >
> > >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-206%3A+Support+PyFlink+Runtime+Execution+in+Thread+Mode
> > >> > > > >> >
> > >> > > > >> >
> > >> > > > >>
> > >> > > > >
> > >> > >
> > >> >
> > >>
> > >
>

Reply via email to