I can guess that the same issues mentioned here probably will affect the usability for people trying Beam's interactive SQL on Unbounded IO too.
We should really take into account that the performance of the SDF based path should be as good or better than the previous version before considering its removal (--experiments=use_deprecated_read) and probably have consensus when this happens. On Fri, Dec 11, 2020 at 11:33 PM Boyuan Zhang <boyu...@google.com> wrote: > > > From what I've seen, the direct runner initiates a checkpoint after every > > element output. > That seems like the 1 second limit kicks in before the output reaches 100 > elements. > > I think the original purpose for DirectRunner to use a small limit on issuing > checkpoint requests is for exercising SDF better in a small data set. But it > brings overhead on a larger set owing to too many checkpoints. It would be > ideal to make this limit configurable from pipeline but the easiest approach > is that we figure out a number for most common cases. Do you think we raise > the limit to 1000 elements or every 5 seconds will help? > > On Fri, Dec 11, 2020 at 2:22 PM Steve Niemitz <sniem...@apache.org> wrote: >> >> From what I've seen, the direct runner initiates a checkpoint after every >> element output. >> >> On Fri, Dec 11, 2020 at 5:19 PM Boyuan Zhang <boyu...@google.com> wrote: >>> >>> Hi Antonio, >>> >>> Thanks for the details! Which version of Beam SDK are you using? And are >>> you using --experiments=beam_fn_api with DirectRunner to launch your >>> pipeline? >>> >>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka >>> topic+partition as input element and a KafkaConsumer will be assigned to >>> this topic+partition then poll records continuously. The Kafka consumer >>> will resume reading and return from the process fn when >>> >>> There are no available records currently(this is a feature of SDF which >>> calls SDF self-initiated checkpoint) >>> The OutputAndTimeBoundedSplittableProcessElementInvoker issues checkpoint >>> request to ReadFromKafkaDoFn for getting partial results. The checkpoint >>> frequency for DirectRunner is every 100 output records or every 1 seconds. >>> >>> It seems like either the self-initiated checkpoint or DirectRunner issued >>> checkpoint gives you the performance regression since there is overhead >>> when rescheduling residuals. In your case, it's more like that the >>> checkpoint behavior of OutputAndTimeBoundedSplittableProcessElementInvoker >>> gives you 200 elements a batch. I want to understand what kind of >>> performance regression you are noticing? Is it slower to output the same >>> amount of records? >>> >>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com> wrote: >>>> >>>> Hi Boyuan, >>>> >>>> This is Antonio. I reported the KafkaIO.read() performance issue on the >>>> slack channel a few days ago. >>>> >>>> I am not sure if this is helpful, but I have been doing some debugging on >>>> the SDK KafkaIO performance issue for our pipeline and I would like to >>>> provide some observations. >>>> >>>> It looks like in my case the ReadFromKafkaDoFn.processElement() was >>>> invoked within the same thread and every time kafaconsumer.poll() is >>>> called, it returns some records, from 1 up to 200 records. So, it will >>>> proceed to run the pipeline steps. Each kafkaconsumer.poll() takes about >>>> 0.8ms. So, in this case, the polling and running of the pipeline are >>>> executed sequentially within a single thread. So, after processing a batch >>>> of records, it will need to wait for 0.8ms before it can process the next >>>> batch of records again. >>>> >>>> Any suggestions would be appreciated. >>>> >>>> Hope that helps. >>>> >>>> Thanks and regards, >>>> >>>> Antonio. >>>> >>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com> wrote: >>>> > Opened https://issues.apache.org/jira/browse/BEAM-11403 for tracking. >>>> > >>>> > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <boyu...@google.com> wrote: >>>> > >>>> > > Thanks for the pointer, Steve! I'll check it out. The execution paths >>>> > > for >>>> > > UnboundedSource and SDF wrapper are different. It's highly possible >>>> > > that >>>> > > the regression either comes from the invocation path for SDF wrapper, >>>> > > or >>>> > > the implementation of SDF wrapper itself. >>>> > > >>>> > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <sniem...@apache.org> >>>> > > wrote: >>>> > > >>>> > >> Coincidentally, someone else in the ASF slack mentioned [1] yesterday >>>> > >> that they were seeing significantly reduced performance using >>>> > >> KafkaIO.Read >>>> > >> w/ the SDF wrapper vs the unbounded source. They mentioned they were >>>> > >> using >>>> > >> flink 1.9. >>>> > >> >>>> > >> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900 >>>> > >> >>>> > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <boyu...@google.com> >>>> > >> wrote: >>>> > >> >>>> > >>> Hi Steve, >>>> > >>> >>>> > >>> I think the major performance regression comes from >>>> > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will >>>> > >>> checkpoint the DoFn based on time/output limit and use timers/state >>>> > >>> to >>>> > >>> reschedule works. >>>> > >>> >>>> > >>> [1] >>>> > >>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java >>>> > >>> >>>> > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <sniem...@apache.org> >>>> > >>> wrote: >>>> > >>> >>>> > >>>> I have a pipeline that reads from pubsub, does some aggregation, and >>>> > >>>> writes to various places. Previously, in older versions of beam, >>>> > >>>> when >>>> > >>>> running this in the DirectRunner, messages would go through the >>>> > >>>> pipeline >>>> > >>>> almost instantly, making it very easy to debug locally, etc. >>>> > >>>> >>>> > >>>> However, after upgrading to beam 2.25, I noticed that it could take >>>> > >>>> on >>>> > >>>> the order of 5-10 minutes for messages to get from the pubsub read >>>> > >>>> step to >>>> > >>>> the next step in the pipeline (deserializing them, etc). The >>>> > >>>> subscription >>>> > >>>> being read from has on the order of 100,000 elements/sec arriving >>>> > >>>> in it. >>>> > >>>> >>>> > >>>> Setting --experiments=use_deprecated_read fixes it, and makes the >>>> > >>>> pipeline behave as it did before. >>>> > >>>> >>>> > >>>> It seems like the SDF implementation in the DirectRunner here is >>>> > >>>> causing some kind of issue, either buffering a very large amount of >>>> > >>>> data >>>> > >>>> before emitting it in a bundle, or something else. Has anyone else >>>> > >>>> run >>>> > >>>> into this? >>>> > >>>> >>>> > >>> >>>> >