Hi Boyuan, Sorry for my late reply. I was off for a few days.
I didn't use DirectRunner. I am using FlinkRunner. We measured the number of Kafka messages that we can processed per second. With Beam v2.26 with --experiments=use_deprecated_read and --fasterCopy=true, we are able to consume 13K messages per second, but with Beam v2.26 without the use_deprecated_read option, we are only able to process 10K messages per second for the same pipeline. Thanks and regards, Antonio. On 2020/12/11 22:19:40, Boyuan Zhang <boyu...@google.com> wrote: > Hi Antonio, > > Thanks for the details! Which version of Beam SDK are you using? And are > you using --experiments=beam_fn_api with DirectRunner to launch your > pipeline? > > For ReadFromKafkaDoFn.processElement(), it will take a Kafka > topic+partition as input element and a KafkaConsumer will be assigned to > this topic+partition then poll records continuously. The Kafka consumer > will resume reading and return from the process fn when > > - There are no available records currently(this is a feature of SDF > which calls SDF self-initiated checkpoint) > - The OutputAndTimeBoundedSplittableProcessElementInvoker issues > checkpoint request to ReadFromKafkaDoFn for getting partial results. The > checkpoint frequency for DirectRunner is every 100 output records or every > 1 seconds. > > It seems like either the self-initiated checkpoint or DirectRunner issued > checkpoint gives you the performance regression since there is overhead > when rescheduling residuals. In your case, it's more like that the > checkpoint behavior of OutputAndTimeBoundedSplittableProcessElementInvoker > gives you 200 elements a batch. I want to understand what kind of > performance regression you are noticing? Is it slower to output the same > amount of records? > > On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com> wrote: > > > Hi Boyuan, > > > > This is Antonio. I reported the KafkaIO.read() performance issue on the > > slack channel a few days ago. > > > > I am not sure if this is helpful, but I have been doing some debugging on > > the SDK KafkaIO performance issue for our pipeline and I would like to > > provide some observations. > > > > It looks like in my case the ReadFromKafkaDoFn.processElement() was > > invoked within the same thread and every time kafaconsumer.poll() is > > called, it returns some records, from 1 up to 200 records. So, it will > > proceed to run the pipeline steps. Each kafkaconsumer.poll() takes about > > 0.8ms. So, in this case, the polling and running of the pipeline are > > executed sequentially within a single thread. So, after processing a batch > > of records, it will need to wait for 0.8ms before it can process the next > > batch of records again. > > > > Any suggestions would be appreciated. > > > > Hope that helps. > > > > Thanks and regards, > > > > Antonio. > > > > On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote: > > > Opened https://issues.apache.org/jira/browse/BEAM-11403 for tracking. > > > > > > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <boyu...@google.com> wrote: > > > > > > > Thanks for the pointer, Steve! I'll check it out. The execution paths > > for > > > > UnboundedSource and SDF wrapper are different. It's highly possible > > that > > > > the regression either comes from the invocation path for SDF wrapper, > > or > > > > the implementation of SDF wrapper itself. > > > > > > > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <sniem...@apache.org> > > wrote: > > > > > > > >> Coincidentally, someone else in the ASF slack mentioned [1] yesterday > > > >> that they were seeing significantly reduced performance using > > KafkaIO.Read > > > >> w/ the SDF wrapper vs the unbounded source. They mentioned they were > > using > > > >> flink 1.9. > > > >> > > > >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900 > > > >> > > > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <boyu...@google.com> > > wrote: > > > >> > > > >>> Hi Steve, > > > >>> > > > >>> I think the major performance regression comes from > > > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will > > > >>> checkpoint the DoFn based on time/output limit and use timers/state > > to > > > >>> reschedule works. > > > >>> > > > >>> [1] > > > >>> > > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java > > > >>> > > > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <sniem...@apache.org> > > > >>> wrote: > > > >>> > > > >>>> I have a pipeline that reads from pubsub, does some aggregation, and > > > >>>> writes to various places. Previously, in older versions of beam, > > when > > > >>>> running this in the DirectRunner, messages would go through the > > pipeline > > > >>>> almost instantly, making it very easy to debug locally, etc. > > > >>>> > > > >>>> However, after upgrading to beam 2.25, I noticed that it could take > > on > > > >>>> the order of 5-10 minutes for messages to get from the pubsub read > > step to > > > >>>> the next step in the pipeline (deserializing them, etc). The > > subscription > > > >>>> being read from has on the order of 100,000 elements/sec arriving > > in it. > > > >>>> > > > >>>> Setting --experiments=use_deprecated_read fixes it, and makes the > > > >>>> pipeline behave as it did before. > > > >>>> > > > >>>> It seems like the SDF implementation in the DirectRunner here is > > > >>>> causing some kind of issue, either buffering a very large amount of > > data > > > >>>> before emitting it in a bundle, or something else. Has anyone else > > run > > > >>>> into this? > > > >>>> > > > >>> > > > > > >