[ 
https://issues.apache.org/jira/browse/SAMZA-461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14216968#comment-14216968
 ] 

Ben Kirwin commented on SAMZA-461:
----------------------------------

For me, it's a bit unintuitive that the reset logic would come into it at all.

FWIW, I've patched OffsetManager on my local maven to look something like this:

{code}
val nextOffset = {
  val requested = systemStreamPartitionMetadata.getOffset(offsetType)

  if (requested == null) {
    warn(s"Null offset for offset type $offsetType in $systemStreamPartition. 
Starting read from next possible offset.")
    systemStreamPartitionMetadata.getOffset(OffsetType.UPCOMING)
  } else requested
}
{code}

So if there's nothing in the queue, it will start the consumer at the upcoming 
offset. (Seems like it would be cleaner to avoid the null case in 
SystemStreamPartitionMetadata.upcomingOffset entirely, but this at least has 
the behaviour I'm looking for.)

> Race when initializing offsets at job startup leads to skipped messages
> -----------------------------------------------------------------------
>
>                 Key: SAMZA-461
>                 URL: https://issues.apache.org/jira/browse/SAMZA-461
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>            Reporter: Ben Kirwin
>             Fix For: 0.9.0
>
>
> If the default offset is set to oldest, a Samza job should start from the 
> very beginning of the stream:
> {code}
> systems.kafka.samza.offset.default=oldest
> {code}
> However, if the very first messages are added to the stream while the job is 
> booting up, it's possible for those messages to be skipped entirely.
> When there are no messages in a stream, Samza reads the 'oldest' offset as 
> null. This null value is added to the map of starting offsets in the offset 
> manager. When the Kafka broker proxy gets the null offset, it complains:
> {code}
> It appears that we received an invalid or empty offset [...] Attempting to 
> use Kafka's auto.offset.reset setting. This can result in data loss if 
> processing continues.
> {code}
> If auto.offset.reset is not manually configured, this defaults to starting 
> with the latest value. If messages have appeared in the stream in the 
> meantime, the job will start *after* those messages, and data is indeed lost.
> It seems like setting oldestOffset to equal upcomingOffset would solve the 
> issue. (It's also semantically reasonable -- the upcoming offset is indeed 
> the oldest offset that will ever be read.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to