[
https://issues.apache.org/jira/browse/SAMZA-461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14202200#comment-14202200
]
Chris Riccomini commented on SAMZA-461:
---------------------------------------
I think the most intuitive thing is to have the KafkaSystemConsumer pay
attention to the samza.offset.default setting if auto.offset.reset isn't set.
If samza.offset.default is set to "oldest", then auto.offset.reset should be
set to oldest as well, unless it's already been set manually in the configs.
That way, if the oldest offset is out of range, auto.offset.reset will kick in,
and get the currently oldest offset.
Another way we could handle this would be to properly model it in Samza, but
that's a bit tricky. We don't have the concept of "out of range" right now. I
could see us getting into a race condition where we continue getting out of
range exceptions, refreshing the metadata, and getting another out of range
exception.
> Race when initializing offsets at job startup leads to skipped messages
> -----------------------------------------------------------------------
>
> Key: SAMZA-461
> URL: https://issues.apache.org/jira/browse/SAMZA-461
> Project: Samza
> Issue Type: Bug
> Reporter: Ben Kirwin
>
> If the default offset is set to oldest, a Samza job should start from the
> very beginning of the stream:
> {code}
> systems.kafka.samza.offset.default=oldest
> {code}
> However, if the very first messages are added to the stream while the job is
> booting up, it's possible for those messages to be skipped entirely.
> When there are no messages in a stream, Samza reads the 'oldest' offset as
> null. This null value is added to the map of starting offsets in the offset
> manager. When the Kafka broker proxy gets the null offset, it complains:
> {code}
> It appears that we received an invalid or empty offset [...] Attempting to
> use Kafka's auto.offset.reset setting. This can result in data loss if
> processing continues.
> {code}
> If auto.offset.reset is not manually configured, this defaults to starting
> with the latest value. If messages have appeared in the stream in the
> meantime, the job will start *after* those messages, and data is indeed lost.
> It seems like setting oldestOffset to equal upcomingOffset would solve the
> issue. (It's also semantically reasonable -- the upcoming offset is indeed
> the oldest offset that will ever be read.)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)