Hi Boyuan,

This is Antonio. I reported the KafkaIO.read() performance issue on the slack 
channel a few days ago.

I am not sure if this is helpful, but I have been doing some debugging on the 
SDK KafkaIO performance issue for our pipeline and I would like to provide some 
observations.

It looks like in my case the ReadFromKafkaDoFn.processElement()  was invoked 
within the same thread and every time kafaconsumer.poll() is called, it returns 
some records, from 1 up to 200 records. So, it will proceed to run the pipeline 
steps. Each kafkaconsumer.poll() takes about 0.8ms. So, in this case, the 
polling and running of the pipeline are executed sequentially within a single 
thread. So, after processing a batch of records, it will need to wait for 0.8ms 
before it can process the next batch of records again.

Any suggestions would be appreciated.

Hope that helps.

Thanks and regards,

Antonio.

On 2020/12/04 19:17:46, Boyuan Zhang <[email protected]> wrote: 
> Opened https://issues.apache.org/jira/browse/BEAM-11403 for tracking.
> 
> On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <[email protected]> wrote:
> 
> > Thanks for the pointer, Steve! I'll check it out. The execution paths for
> > UnboundedSource and SDF wrapper are different. It's highly possible that
> > the regression either comes from the invocation path for SDF wrapper, or
> > the implementation of SDF wrapper itself.
> >
> > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <[email protected]> wrote:
> >
> >> Coincidentally, someone else in the ASF slack mentioned [1] yesterday
> >> that they were seeing significantly reduced performance using KafkaIO.Read
> >> w/ the SDF wrapper vs the unbounded source.  They mentioned they were using
> >> flink 1.9.
> >>
> >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
> >>
> >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <[email protected]> wrote:
> >>
> >>> Hi Steve,
> >>>
> >>> I think the major performance regression comes from
> >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will
> >>> checkpoint the DoFn based on time/output limit and use timers/state to
> >>> reschedule works.
> >>>
> >>> [1]
> >>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
> >>>
> >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <[email protected]>
> >>> wrote:
> >>>
> >>>> I have a pipeline that reads from pubsub, does some aggregation, and
> >>>> writes to various places.  Previously, in older versions of beam, when
> >>>> running this in the DirectRunner, messages would go through the pipeline
> >>>> almost instantly, making it very easy to debug locally, etc.
> >>>>
> >>>> However, after upgrading to beam 2.25, I noticed that it could take on
> >>>> the order of 5-10 minutes for messages to get from the pubsub read step 
> >>>> to
> >>>> the next step in the pipeline (deserializing them, etc).  The 
> >>>> subscription
> >>>> being read from has on the order of 100,000 elements/sec arriving in it.
> >>>>
> >>>> Setting --experiments=use_deprecated_read fixes it, and makes the
> >>>> pipeline behave as it did before.
> >>>>
> >>>> It seems like the SDF implementation in the DirectRunner here is
> >>>> causing some kind of issue, either buffering a very large amount of data
> >>>> before emitting it in a bundle, or something else.  Has anyone else run
> >>>> into this?
> >>>>
> >>>
> 

Reply via email to