Neil Fordyce created SAMZA-1169:
-----------------------------------
Summary: Messages not processed if checkpoint offset is above
valid range
Key: SAMZA-1169
URL: https://issues.apache.org/jira/browse/SAMZA-1169
Project: Samza
Issue Type: Bug
Affects Versions: 0.10.1
Reporter: Neil Fordyce
If the recorded checkpoint of an input partition is higher than the highest
available offset for that input partition, then messages are not be processed.
Message processing will not resume until the checkpoint is in range.
The initialization logs for the job seem to indicate that the offset will be
reset, but the offset remains the same. In the case of the following logs, the
checkpoint topic for the job repeatedly received new checkpoint offset messages
for \[app,0\] of 1097328067. Despite new messages being written to the
partition, the job was not doing any message processing.
{code}
BrokerProxy [WARN] It appears that we received an invalid or empty offset
Some(1097328068) for [app,0]. Attempting to use Kafka's auto.offset.reset
setting. This can result in data loss if processing continues.
GetOffset [INFO] Checking if auto.offset.reset is defined for topic apps
GetOffset [INFO] Got reset of type largest.
{code}
*Repro steps*
1. Start a job which checkpoints using Kafka
2. Kill the job
3. Delete the job's input topic (or overwrite the checkpoint with one which is
much larger than existing valid offsets)
4. Start producing into the deleted topic again
5. Start the job again
*Expected behaviour*
The checkpoint manager should overwrite the checkpoint with one which is in
range, as per the auto.offset.reset setting.
*Observed behaviour*
The job does not process any messages until the checkpointed offset is in range
again. The checkpoint is not reset to a valid offset, despite logs indicating
it will be. Repeated new checkpoint records are written to the checkpoint
topic with the same out-of-range offset. The job appears to consume messages
according to [messages-read
metrics|https://samza.apache.org/learn/documentation/0.11/container/metrics-table.html#kafka-system-consumer-metrics],
but process() is not called.
Situations where reset offset can occur:
1. Topic deletion and recreation
2. Loss of several Kafka brokers resulting in partition loss
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)