[
https://issues.apache.org/jira/browse/SAMZA-1169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940963#comment-15940963
]
Neil Fordyce commented on SAMZA-1169:
-------------------------------------
We managed to workaround this issue using the checkpoint tool to set the
checkpoint to a valid offset. However, it might be preferable for the
checkpoint reset to occur automatically during job initialization as per the
auto.offset.reset setting.
> Messages not processed if checkpoint offset is above valid range
> ----------------------------------------------------------------
>
> Key: SAMZA-1169
> URL: https://issues.apache.org/jira/browse/SAMZA-1169
> Project: Samza
> Issue Type: Bug
> Affects Versions: 0.10.1
> Reporter: Neil Fordyce
>
> If the recorded checkpoint of an input partition is higher than the highest
> available offset for that input partition, then messages are not be
> processed. Message processing will not resume until the checkpoint is in
> range.
> The initialization logs for the job seem to indicate that the offset will be
> reset, but the offset remains the same. In the case of the following logs,
> the checkpoint topic for the job repeatedly received new checkpoint offset
> messages for \[app,0\] of 1097328067. Despite new messages being written to
> the partition, the job was not doing any message processing.
> {code}
> BrokerProxy [WARN] It appears that we received an invalid or empty offset
> Some(1097328068) for [app,0]. Attempting to use Kafka's auto.offset.reset
> setting. This can result in data loss if processing continues.
> GetOffset [INFO] Checking if auto.offset.reset is defined for topic apps
> GetOffset [INFO] Got reset of type largest.
> {code}
> *Repro steps*
> 1. Start a job which checkpoints using Kafka
> 2. Kill the job
> 3. Delete the job's input topic (or overwrite the checkpoint with one which
> is much larger than existing valid offsets)
> 4. Start producing into the deleted topic again
> 5. Start the job again
> *Expected behaviour*
> The checkpoint manager should overwrite the checkpoint with one which is in
> range, as per the auto.offset.reset setting.
> *Observed behaviour*
> The job does not process any messages until the checkpointed offset is in
> range again. The checkpoint is not reset to a valid offset, despite logs
> indicating it will be. Repeated new checkpoint records are written to the
> checkpoint topic with the same out-of-range offset. The job appears to
> consume messages according to [messages-read
> metrics|https://samza.apache.org/learn/documentation/0.11/container/metrics-table.html#kafka-system-consumer-metrics],
> but process() is not called.
> Situations where reset offset can occur:
> 1. Topic deletion and recreation
> 2. Loss of several Kafka brokers resulting in partition loss
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)