Neil Fordyce created SAMZA-1169:
-----------------------------------

             Summary: Messages not processed if checkpoint offset is above 
valid range
                 Key: SAMZA-1169
                 URL: https://issues.apache.org/jira/browse/SAMZA-1169
             Project: Samza
          Issue Type: Bug
    Affects Versions: 0.10.1
            Reporter: Neil Fordyce


If the recorded checkpoint of an input partition is higher than the highest 
available offset for that input partition, then messages are not be processed.  
Message processing will not resume until the checkpoint is in range.

The initialization logs for the job seem to indicate that the offset will be 
reset, but the offset remains the same.  In the case of the following logs, the 
checkpoint topic for the job repeatedly received new checkpoint offset messages 
for \[app,0\] of 1097328067.  Despite new messages being written to the 
partition, the job was not doing any message processing.
{code}
BrokerProxy [WARN] It appears that we received an invalid or empty offset 
Some(1097328068) for [app,0]. Attempting to use Kafka's auto.offset.reset 
setting. This can result in data loss if processing continues.
GetOffset [INFO] Checking if auto.offset.reset is defined for topic apps
GetOffset [INFO] Got reset of type largest.
{code}

*Repro steps*
1. Start a job which checkpoints using Kafka
2. Kill the job
3. Delete the job's input topic (or overwrite the checkpoint with one which is 
much larger than existing valid offsets)
4. Start producing into the deleted topic again
5. Start the job again

*Expected behaviour*
The checkpoint manager should overwrite the checkpoint with one which is in 
range, as per the auto.offset.reset setting.

*Observed behaviour*
The job does not process any messages until the checkpointed offset is in range 
again.  The checkpoint is not reset to a valid offset, despite logs indicating 
it will be.  Repeated new checkpoint records are written to the checkpoint 
topic with the same out-of-range offset.  The job appears to consume messages 
according to [messages-read 
metrics|https://samza.apache.org/learn/documentation/0.11/container/metrics-table.html#kafka-system-consumer-metrics],
 but process() is not called.  

Situations where reset offset can occur:
1. Topic deletion and recreation
2. Loss of several Kafka brokers resulting in partition loss



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to