Hi Boyuan,

Sorry for my late reply. I was off for a few days.

I didn't use DirectRunner. I am using FlinkRunner.

We measured the number of Kafka messages that we can processed per second.
With Beam v2.26 with --experiments=use_deprecated_read and --fasterCopy=true,
we are able to consume 13K messages per second, but with Beam v2.26
without the use_deprecated_read option, we are only able to process 10K messages
per second for the same pipeline.

Thanks and regards,

Antonio.

On 2020/12/11 22:19:40, Boyuan Zhang <boyu...@google.com> wrote: 
> Hi Antonio,
> 
> Thanks for the details! Which version of Beam SDK are you using? And are
> you using --experiments=beam_fn_api with DirectRunner to launch your
> pipeline?
> 
> For ReadFromKafkaDoFn.processElement(), it will take a Kafka
> topic+partition as input element and a KafkaConsumer will be assigned to
> this topic+partition then poll records continuously. The Kafka consumer
> will resume reading and return from the process fn when
> 
>    - There are no available records currently(this is a feature of SDF
>    which calls SDF self-initiated checkpoint)
>    - The OutputAndTimeBoundedSplittableProcessElementInvoker issues
>    checkpoint request to ReadFromKafkaDoFn for getting partial results. The
>    checkpoint frequency for DirectRunner is every 100 output records or every
>    1 seconds.
> 
> It seems like either the self-initiated checkpoint or DirectRunner issued
> checkpoint gives you the performance regression since there is overhead
> when rescheduling residuals. In your case, it's more like that the
> checkpoint behavior of OutputAndTimeBoundedSplittableProcessElementInvoker
> gives you 200 elements a batch. I want to understand what kind of
> performance regression you are noticing? Is it slower to output the same
> amount of records?
> 
> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com> wrote:
> 
> > Hi Boyuan,
> >
> > This is Antonio. I reported the KafkaIO.read() performance issue on the
> > slack channel a few days ago.
> >
> > I am not sure if this is helpful, but I have been doing some debugging on
> > the SDK KafkaIO performance issue for our pipeline and I would like to
> > provide some observations.
> >
> > It looks like in my case the ReadFromKafkaDoFn.processElement()  was
> > invoked within the same thread and every time kafaconsumer.poll() is
> > called, it returns some records, from 1 up to 200 records. So, it will
> > proceed to run the pipeline steps. Each kafkaconsumer.poll() takes about
> > 0.8ms. So, in this case, the polling and running of the pipeline are
> > executed sequentially within a single thread. So, after processing a batch
> > of records, it will need to wait for 0.8ms before it can process the next
> > batch of records again.
> >
> > Any suggestions would be appreciated.
> >
> > Hope that helps.
> >
> > Thanks and regards,
> >
> > Antonio.
> >
> > On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote:
> > > Opened https://issues.apache.org/jira/browse/BEAM-11403 for tracking.
> > >
> > > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <boyu...@google.com> wrote:
> > >
> > > > Thanks for the pointer, Steve! I'll check it out. The execution paths
> > for
> > > > UnboundedSource and SDF wrapper are different. It's highly possible
> > that
> > > > the regression either comes from the invocation path for SDF wrapper,
> > or
> > > > the implementation of SDF wrapper itself.
> > > >
> > > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <sniem...@apache.org>
> > wrote:
> > > >
> > > >> Coincidentally, someone else in the ASF slack mentioned [1] yesterday
> > > >> that they were seeing significantly reduced performance using
> > KafkaIO.Read
> > > >> w/ the SDF wrapper vs the unbounded source.  They mentioned they were
> > using
> > > >> flink 1.9.
> > > >>
> > > >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
> > > >>
> > > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <boyu...@google.com>
> > wrote:
> > > >>
> > > >>> Hi Steve,
> > > >>>
> > > >>> I think the major performance regression comes from
> > > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will
> > > >>> checkpoint the DoFn based on time/output limit and use timers/state
> > to
> > > >>> reschedule works.
> > > >>>
> > > >>> [1]
> > > >>>
> > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
> > > >>>
> > > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <sniem...@apache.org>
> > > >>> wrote:
> > > >>>
> > > >>>> I have a pipeline that reads from pubsub, does some aggregation, and
> > > >>>> writes to various places.  Previously, in older versions of beam,
> > when
> > > >>>> running this in the DirectRunner, messages would go through the
> > pipeline
> > > >>>> almost instantly, making it very easy to debug locally, etc.
> > > >>>>
> > > >>>> However, after upgrading to beam 2.25, I noticed that it could take
> > on
> > > >>>> the order of 5-10 minutes for messages to get from the pubsub read
> > step to
> > > >>>> the next step in the pipeline (deserializing them, etc).  The
> > subscription
> > > >>>> being read from has on the order of 100,000 elements/sec arriving
> > in it.
> > > >>>>
> > > >>>> Setting --experiments=use_deprecated_read fixes it, and makes the
> > > >>>> pipeline behave as it did before.
> > > >>>>
> > > >>>> It seems like the SDF implementation in the DirectRunner here is
> > > >>>> causing some kind of issue, either buffering a very large amount of
> > data
> > > >>>> before emitting it in a bundle, or something else.  Has anyone else
> > run
> > > >>>> into this?
> > > >>>>
> > > >>>
> > >
> >
> 

Reply via email to