Thanks for the pointer, Steve! I'll check it out. The execution paths for
UnboundedSource and SDF wrapper are different. It's highly possible that
the regression either comes from the invocation path for SDF wrapper, or
the implementation of SDF wrapper itself.

On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz <sniem...@apache.org> wrote:

> Coincidentally, someone else in the ASF slack mentioned [1] yesterday that
> they were seeing significantly reduced performance using KafkaIO.Read w/
> the SDF wrapper vs the unbounded source.  They mentioned they were using
> flink 1.9.
>
> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
>
> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang <boyu...@google.com> wrote:
>
>> Hi Steve,
>>
>> I think the major performance regression comes from
>> OutputAndTimeBoundedSplittableProcessElementInvoker[1], which will
>> checkpoint the DoFn based on time/output limit and use timers/state to
>> reschedule works.
>>
>> [1]
>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
>>
>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz <sniem...@apache.org> wrote:
>>
>>> I have a pipeline that reads from pubsub, does some aggregation, and
>>> writes to various places.  Previously, in older versions of beam, when
>>> running this in the DirectRunner, messages would go through the pipeline
>>> almost instantly, making it very easy to debug locally, etc.
>>>
>>> However, after upgrading to beam 2.25, I noticed that it could take on
>>> the order of 5-10 minutes for messages to get from the pubsub read step to
>>> the next step in the pipeline (deserializing them, etc).  The subscription
>>> being read from has on the order of 100,000 elements/sec arriving in it.
>>>
>>> Setting --experiments=use_deprecated_read fixes it, and makes the
>>> pipeline behave as it did before.
>>>
>>> It seems like the SDF implementation in the DirectRunner here is causing
>>> some kind of issue, either buffering a very large amount of data before
>>> emitting it in a bundle, or something else.  Has anyone else run into this?
>>>
>>

Reply via email to