If readers are expensive to create, this seems like an important (and not too difficult) optimization.
On Mon, Dec 21, 2020 at 11:04 AM Jan Lukavský <je...@seznam.cz> wrote: > Hi Boyuan, > > I think your analysis is correct - with one exception. It should be > possible to reuse the reader if and only if the last taken CheckpointMark > equals to the new CheckpointMark the reader would be created from. But - > this equality is on the happy path and should be satisfied for vast > majority of invocations, so it will spare many call to createReader. > Actually, it should be non-equal only after recovery from checkpoint, but > then there should be no reader. So to be technically correct, we should > keep the last CheckpointMark along with the open reader, but that might > turn out to be non-necessary (I'm not sure about that and I would > definitely keep the last CheckpointMark, because it is better safe than > sorry :)) > > Jan > On 12/21/20 7:54 PM, Boyuan Zhang wrote: > > Hi Jan, >> >> it seems that what we would want is to couple the lifecycle of the Reader >> not with the restriction but with the particular instance of >> (Un)boundedSource (after being split). That could be done in the processing >> DoFn, if it contained a cache mapping instance of the source to the >> (possibly null - i.e. not yet open) reader. In @NewTracker we could assign >> (or create) the reader to the tracker, as the tracker is created for each >> restriction. >> >> WDYT? >> > I was thinking about this but it seems like it is not applicable to the > way how UnboundedSource and UnboundedReader work together. > Please correct me if I'm wrong. The UnboundedReader is created from > UnboundedSource per CheckpointMark[1], which means for certain sources, the > CheckpointMark could affect some attributes like start position of the > reader when resuming. So a single UnboundedSource could be mapped to > multiple readers because of different instances of CheckpointMarl. That's > also the reason why we use CheckpointMark as the restriction. > > Please let me know if I misunderstand your suggestion. > > [1] > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78 > > On Mon, Dec 21, 2020 at 9:18 AM Antonio Si <antonio...@gmail.com> wrote: > >> Hi Boyuan, >> >> Sorry for my late reply. I was off for a few days. >> >> I didn't use DirectRunner. I am using FlinkRunner. >> >> We measured the number of Kafka messages that we can processed per second. >> With Beam v2.26 with --experiments=use_deprecated_read and >> --fasterCopy=true, >> we are able to consume 13K messages per second, but with Beam v2.26 >> without the use_deprecated_read option, we are only able to process 10K >> messages >> per second for the same pipeline. >> >> Thanks and regards, >> >> Antonio. >> >> On 2020/12/11 22:19:40, Boyuan Zhang <boyu...@google.com> wrote: >> > Hi Antonio, >> > >> > Thanks for the details! Which version of Beam SDK are you using? And are >> > you using --experiments=beam_fn_api with DirectRunner to launch your >> > pipeline? >> > >> > For ReadFromKafkaDoFn.processElement(), it will take a Kafka >> > topic+partition as input element and a KafkaConsumer will be assigned to >> > this topic+partition then poll records continuously. The Kafka consumer >> > will resume reading and return from the process fn when >> > >> > - There are no available records currently(this is a feature of SDF >> > which calls SDF self-initiated checkpoint) >> > - The OutputAndTimeBoundedSplittableProcessElementInvoker issues >> > checkpoint request to ReadFromKafkaDoFn for getting partial results. >> The >> > checkpoint frequency for DirectRunner is every 100 output records or >> every >> > 1 seconds. >> > >> > It seems like either the self-initiated checkpoint or DirectRunner >> issued >> > checkpoint gives you the performance regression since there is overhead >> > when rescheduling residuals. In your case, it's more like that the >> > checkpoint behavior of >> OutputAndTimeBoundedSplittableProcessElementInvoker >> > gives you 200 elements a batch. I want to understand what kind of >> > performance regression you are noticing? Is it slower to output the same >> > amount of records? >> > >> > On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com> >> wrote: >> > >> > > Hi Boyuan, >> > > >> > > This is Antonio. I reported the KafkaIO.read() performance issue on >> the >> > > slack channel a few days ago. >> > > >> > > I am not sure if this is helpful, but I have been doing some >> debugging on >> > > the SDK KafkaIO performance issue for our pipeline and I would like to >> > > provide some observations. >> > > >> > > It looks like in my case the ReadFromKafkaDoFn.processElement() was >> > > invoked within the same thread and every time kafaconsumer.poll() is >> > > called, it returns some records, from 1 up to 200 records. So, it will >> > > proceed to run the pipeline steps. Each kafkaconsumer.poll() takes >> about >> > > 0.8ms. So, in this case, the polling and running of the pipeline are >> > > executed sequentially within a single thread. So, after processing a >> batch >> > > of records, it will need to wait for 0.8ms before it can process the >> next >> > > batch of records again. >> > > >> > > Any suggestions would be appreciated. >> > > >> > > Hope that helps. >> > > >> > > Thanks and regards, >> > > >> > > Antonio. >> > > >> > > On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote: >> > > > Opened https://issues.apache.org/jira/browse/BEAM-11403 for >> tracking. >> > > > >> > > > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <boyu...@google.com> >> wrote: >> > > > >> > > > > Thanks for the pointer, Steve! I'll check it out. The execution >> paths >> > > for >> > > > > UnboundedSource and SDF wrapper are different. It's highly >> possible >> > > that >> > > > > the regression either comes from the invocation path for SDF >> wrapper, >> > > or >> > > > > the implementation of SDF wrapper itself. >> > > > > >> > > > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <sniem...@apache.org >> > >> > > wrote: >> > > > > >> > > > >> Coincidentally, someone else in the ASF slack mentioned [1] >> yesterday >> > > > >> that they were seeing significantly reduced performance using >> > > KafkaIO.Read >> > > > >> w/ the SDF wrapper vs the unbounded source. They mentioned they >> were >> > > using >> > > > >> flink 1.9. >> > > > >> >> > > > >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900 >> > > > >> >> > > > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <boyu...@google.com> >> > > wrote: >> > > > >> >> > > > >>> Hi Steve, >> > > > >>> >> > > > >>> I think the major performance regression comes from >> > > > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which >> will >> > > > >>> checkpoint the DoFn based on time/output limit and use >> timers/state >> > > to >> > > > >>> reschedule works. >> > > > >>> >> > > > >>> [1] >> > > > >>> >> > > >> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java >> > > > >>> >> > > > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz < >> sniem...@apache.org> >> > > > >>> wrote: >> > > > >>> >> > > > >>>> I have a pipeline that reads from pubsub, does some >> aggregation, and >> > > > >>>> writes to various places. Previously, in older versions of >> beam, >> > > when >> > > > >>>> running this in the DirectRunner, messages would go through the >> > > pipeline >> > > > >>>> almost instantly, making it very easy to debug locally, etc. >> > > > >>>> >> > > > >>>> However, after upgrading to beam 2.25, I noticed that it could >> take >> > > on >> > > > >>>> the order of 5-10 minutes for messages to get from the pubsub >> read >> > > step to >> > > > >>>> the next step in the pipeline (deserializing them, etc). The >> > > subscription >> > > > >>>> being read from has on the order of 100,000 elements/sec >> arriving >> > > in it. >> > > > >>>> >> > > > >>>> Setting --experiments=use_deprecated_read fixes it, and makes >> the >> > > > >>>> pipeline behave as it did before. >> > > > >>>> >> > > > >>>> It seems like the SDF implementation in the DirectRunner here >> is >> > > > >>>> causing some kind of issue, either buffering a very large >> amount of >> > > data >> > > > >>>> before emitting it in a bundle, or something else. Has anyone >> else >> > > run >> > > > >>>> into this? >> > > > >>>> >> > > > >>> >> > > > >> > > >> > >> >