Hello Spark folks,

I'm reading compacted Kafka topic with spark 2.4, using direct stream -
KafkaUtils.createDirectStream(...). I have configured necessary options for
compacted stream, so its processed with CompactedKafkaRDDIterator.
It works well, however in case of many gaps in the topic, the processing is
very slow and 90% of time the executors are idle.

I had a look to the source are here are my findings:
Spark first computes number of records to stream from Kafka (processing
rate * batch window size). # of records are translated to Kafka's
(offset_from, offset_to) and eventually the Iterator reads records within
the offset boundaries.
This works fine until there are many gaps in the topic, which reduces the
real number of processed records.
Let's say we wanted to read 100k records in 60 sec window. With gaps it
gets to 10k (because 90k are just compacted gaps) in 60 sec.
As a result executor is working only 6 sec and 54 sec doing nothing.
I'd like to utilize the executor as much as possible.

A great feature would be to read 100k real records (skip the gaps) no
matter what are the offsets.

I've tried to make some improvement with backpressure and my custom
RateEstimator (decorating PidRateEstimator and boosting the rate per
second). And was even able to fully utilize the executors, but my approach
have a big problem when compacted part of the topic meets non compacted
part. The executor just tries to read a too big chunk of Kafka and the
whole processing dies.

BR,
Tomas

Reply via email to