[
https://issues.apache.org/jira/browse/SPARK-17147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15453884#comment-15453884
]
Sean McKibben commented on SPARK-17147:
---------------------------------------
I think Kafka's log compaction's design is still intended for sequential
reading, even if the offsets are not consecutive for a compacted topic. Kafka's
internal log cleaner process copies log segments to new files which have been
compacted, so the messages are still stored sequentially even if the offset
metadata for them increases by more than one. The typical consumer just does a
poll() to get the next records, regardless of their offsets, but this Spark's
CachedKafkaConsumer checks the offset of each record before calling poll(), and
if that offset isn't the previous record's offset +1, it's going to call
consumer.seek() before the next poll(), which I think is producing the dramatic
slowdown I've seen.
It is certainly possible, using a non-Spark Kafka consumer, to get equivalent
read speeds regardless of whether a topic is compacted.
I think the interplay between the CachedKafkaConsumer and the KafkaRDD might
need to be adjusted. I haven't looked to see if more than one KafkaRDD will
ever be asking for records from a single CachedKafkaConsumer instance, but
since CachedKafkaConsumer was inspecting each offset to see if it was exactly
the offset requested, and not just >= the requested offset, I'm guessing there
was a reason.
The main issue here is that it's becoming apparent that Kafka consumers can't
assume monotonically incrementing offsets. Unfortunately that is an assumption
that Spark-Kafka was making, and I think that assumption will need to be
removed.
> Spark Streaming Kafka 0.10 Consumer Can't Handle Non-consecutive Offsets
> ------------------------------------------------------------------------
>
> Key: SPARK-17147
> URL: https://issues.apache.org/jira/browse/SPARK-17147
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 2.0.0
> Reporter: Robert Conrad
>
> When Kafka does log compaction offsets often end up with gaps, meaning the
> next requested offset will be frequently not be offset+1. The logic in
> KafkaRDD & CachedKafkaConsumer has a baked in assumption that the next offset
> will always be just an increment of 1 above the previous offset.
> I have worked around this problem by changing CachedKafkaConsumer to use the
> returned record's offset, from:
> {{nextOffset = offset + 1}}
> to:
> {{nextOffset = record.offset + 1}}
> and changed KafkaRDD from:
> {{requestOffset += 1}}
> to:
> {{requestOffset = r.offset() + 1}}
> (I also had to change some assert logic in CachedKafkaConsumer).
> There's a strong possibility that I have misconstrued how to use the
> streaming kafka consumer, and I'm happy to close this out if that's the case.
> If, however, it is supposed to support non-consecutive offsets (e.g. due to
> log compaction) I am also happy to contribute a PR.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]