[ 
https://issues.apache.org/jira/browse/SAMZA-1169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940963#comment-15940963
 ] 

Neil Fordyce commented on SAMZA-1169:
-------------------------------------

We managed to workaround this issue using the checkpoint tool to set the 
checkpoint to a valid offset.  However, it might be preferable for the 
checkpoint reset to occur automatically during job initialization as per the 
auto.offset.reset setting.

> Messages not processed if checkpoint offset is above valid range
> ----------------------------------------------------------------
>
>                 Key: SAMZA-1169
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1169
>             Project: Samza
>          Issue Type: Bug
>    Affects Versions: 0.10.1
>            Reporter: Neil Fordyce
>
> If the recorded checkpoint of an input partition is higher than the highest 
> available offset for that input partition, then messages are not be 
> processed.  Message processing will not resume until the checkpoint is in 
> range.
> The initialization logs for the job seem to indicate that the offset will be 
> reset, but the offset remains the same.  In the case of the following logs, 
> the checkpoint topic for the job repeatedly received new checkpoint offset 
> messages for \[app,0\] of 1097328067.  Despite new messages being written to 
> the partition, the job was not doing any message processing.
> {code}
> BrokerProxy [WARN] It appears that we received an invalid or empty offset 
> Some(1097328068) for [app,0]. Attempting to use Kafka's auto.offset.reset 
> setting. This can result in data loss if processing continues.
> GetOffset [INFO] Checking if auto.offset.reset is defined for topic apps
> GetOffset [INFO] Got reset of type largest.
> {code}
> *Repro steps*
> 1. Start a job which checkpoints using Kafka
> 2. Kill the job
> 3. Delete the job's input topic (or overwrite the checkpoint with one which 
> is much larger than existing valid offsets)
> 4. Start producing into the deleted topic again
> 5. Start the job again
> *Expected behaviour*
> The checkpoint manager should overwrite the checkpoint with one which is in 
> range, as per the auto.offset.reset setting.
> *Observed behaviour*
> The job does not process any messages until the checkpointed offset is in 
> range again.  The checkpoint is not reset to a valid offset, despite logs 
> indicating it will be.  Repeated new checkpoint records are written to the 
> checkpoint topic with the same out-of-range offset.  The job appears to 
> consume messages according to [messages-read 
> metrics|https://samza.apache.org/learn/documentation/0.11/container/metrics-table.html#kafka-system-consumer-metrics],
>  but process() is not called.  
> Situations where reset offset can occur:
> 1. Topic deletion and recreation
> 2. Loss of several Kafka brokers resulting in partition loss



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to