Hi Jan,
it seems that what we would want is to couple the lifecycle of the
Reader not with the restriction but with the particular instance
of (Un)boundedSource (after being split). That could be done in
the processing DoFn, if it contained a cache mapping instance of
the source to the (possibly null - i.e. not yet open) reader. In
@NewTracker we could assign (or create) the reader to the tracker,
as the tracker is created for each restriction.
WDYT?
I was thinking about this but it seems like it is not applicable to
the way how UnboundedSource and UnboundedReader work together.
Please correct me if I'm wrong. The UnboundedReader is created from
UnboundedSource per CheckpointMark[1], which means for certain
sources, the CheckpointMark could affect some attributes like start
position of the reader when resuming. So a single UnboundedSource
could be mapped to multiple readers because of different instances of
CheckpointMarl. That's also the reason why we use CheckpointMark as
the restriction.
Please let me know if I misunderstand your suggestion.
[1]
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78
On Mon, Dec 21, 2020 at 9:18 AM Antonio Si <antonio...@gmail.com
<mailto:antonio...@gmail.com>> wrote:
Hi Boyuan,
Sorry for my late reply. I was off for a few days.
I didn't use DirectRunner. I am using FlinkRunner.
We measured the number of Kafka messages that we can processed per
second.
With Beam v2.26 with --experiments=use_deprecated_read and
--fasterCopy=true,
we are able to consume 13K messages per second, but with Beam v2.26
without the use_deprecated_read option, we are only able to
process 10K messages
per second for the same pipeline.
Thanks and regards,
Antonio.
On 2020/12/11 22:19:40, Boyuan Zhang <boyu...@google.com
<mailto:boyu...@google.com>> wrote:
> Hi Antonio,
>
> Thanks for the details! Which version of Beam SDK are you using?
And are
> you using --experiments=beam_fn_api with DirectRunner to launch your
> pipeline?
>
> For ReadFromKafkaDoFn.processElement(), it will take a Kafka
> topic+partition as input element and a KafkaConsumer will be
assigned to
> this topic+partition then poll records continuously. The Kafka
consumer
> will resume reading and return from the process fn when
>
> - There are no available records currently(this is a feature
of SDF
> which calls SDF self-initiated checkpoint)
> - The OutputAndTimeBoundedSplittableProcessElementInvoker issues
> checkpoint request to ReadFromKafkaDoFn for getting partial
results. The
> checkpoint frequency for DirectRunner is every 100 output
records or every
> 1 seconds.
>
> It seems like either the self-initiated checkpoint or
DirectRunner issued
> checkpoint gives you the performance regression since there is
overhead
> when rescheduling residuals. In your case, it's more like that the
> checkpoint behavior of
OutputAndTimeBoundedSplittableProcessElementInvoker
> gives you 200 elements a batch. I want to understand what kind of
> performance regression you are noticing? Is it slower to output
the same
> amount of records?
>
> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio...@gmail.com
<mailto:antonio...@gmail.com>> wrote:
>
> > Hi Boyuan,
> >
> > This is Antonio. I reported the KafkaIO.read() performance
issue on the
> > slack channel a few days ago.
> >
> > I am not sure if this is helpful, but I have been doing some
debugging on
> > the SDK KafkaIO performance issue for our pipeline and I would
like to
> > provide some observations.
> >
> > It looks like in my case the
ReadFromKafkaDoFn.processElement() was
> > invoked within the same thread and every time
kafaconsumer.poll() is
> > called, it returns some records, from 1 up to 200 records. So,
it will
> > proceed to run the pipeline steps. Each kafkaconsumer.poll()
takes about
> > 0.8ms. So, in this case, the polling and running of the
pipeline are
> > executed sequentially within a single thread. So, after
processing a batch
> > of records, it will need to wait for 0.8ms before it can
process the next
> > batch of records again.
> >
> > Any suggestions would be appreciated.
> >
> > Hope that helps.
> >
> > Thanks and regards,
> >
> > Antonio.
> >
> > On 2020/12/04 19:17:46, Boyuan Zhang <boyu...@google.com
<mailto:boyu...@google.com>> wrote:
> > > Opened https://issues.apache.org/jira/browse/BEAM-11403 for
tracking.
> > >
> > > On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang
<boyu...@google.com <mailto:boyu...@google.com>> wrote:
> > >
> > > > Thanks for the pointer, Steve! I'll check it out. The
execution paths
> > for
> > > > UnboundedSource and SDF wrapper are different. It's highly
possible
> > that
> > > > the regression either comes from the invocation path for
SDF wrapper,
> > or
> > > > the implementation of SDF wrapper itself.
> > > >
> > > > On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz
<sniem...@apache.org <mailto:sniem...@apache.org>>
> > wrote:
> > > >
> > > >> Coincidentally, someone else in the ASF slack mentioned
[1] yesterday
> > > >> that they were seeing significantly reduced performance using
> > KafkaIO.Read
> > > >> w/ the SDF wrapper vs the unbounded source. They
mentioned they were
> > using
> > > >> flink 1.9.
> > > >>
> > > >>
https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
> > > >>
> > > >> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang
<boyu...@google.com <mailto:boyu...@google.com>>
> > wrote:
> > > >>
> > > >>> Hi Steve,
> > > >>>
> > > >>> I think the major performance regression comes from
> > > >>> OutputAndTimeBoundedSplittableProcessElementInvoker[1],
which will
> > > >>> checkpoint the DoFn based on time/output limit and use
timers/state
> > to
> > > >>> reschedule works.
> > > >>>
> > > >>> [1]
> > > >>>
> >
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
> > > >>>
> > > >>> On Thu, Dec 3, 2020 at 9:40 AM Steve Niemitz
<sniem...@apache.org <mailto:sniem...@apache.org>>
> > > >>> wrote:
> > > >>>
> > > >>>> I have a pipeline that reads from pubsub, does some
aggregation, and
> > > >>>> writes to various places. Previously, in older versions
of beam,
> > when
> > > >>>> running this in the DirectRunner, messages would go
through the
> > pipeline
> > > >>>> almost instantly, making it very easy to debug locally,
etc.
> > > >>>>
> > > >>>> However, after upgrading to beam 2.25, I noticed that
it could take
> > on
> > > >>>> the order of 5-10 minutes for messages to get from the
pubsub read
> > step to
> > > >>>> the next step in the pipeline (deserializing them,
etc). The
> > subscription
> > > >>>> being read from has on the order of 100,000
elements/sec arriving
> > in it.
> > > >>>>
> > > >>>> Setting --experiments=use_deprecated_read fixes it, and
makes the
> > > >>>> pipeline behave as it did before.
> > > >>>>
> > > >>>> It seems like the SDF implementation in the
DirectRunner here is
> > > >>>> causing some kind of issue, either buffering a very
large amount of
> > data
> > > >>>> before emitting it in a bundle, or something else. Has
anyone else
> > run
> > > >>>> into this?
> > > >>>>
> > > >>>
> > >
> >
>