[ 
https://issues.apache.org/jira/browse/KAFKA-6121?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6121:
-----------------------------------
    Description: 
Streams uses three different consumers internally. The main consumer, as well 
as one consumer for state restore (restore consumer, also used by StandbyTasks) 
and a consumer for global state (used by GlobalThreadThread). While main 
consumer handles InvalidOffsetException correctly, restore and global consumer 
don't. Currently, they rely on auto.offset.reset with default value "latest" -- 
thus, if there is an InvalidOffsetException we just jump to the end of the 
changelog topic instead of proper handler this case.

An InvalidOffsetException can occur for two cases:
# An Kafka Streams application is offline for some time and on restart it reads 
it local offset file. This offset file might contain offsets that are not valid 
anymore as the log got compacted in between.
# Even if we have valid offset and we do a seek, log compaction can actually 
tick an in the background at any point and could make our offset invalid -- 
this is a rather rare race conditions but we need to handle it anyway

For both cases, we can apply the same strategy: wipe out the local RocksDB, 
seekToBeginning, and recreate the store from scratch. Thus, we need to set 
auto.offset.reset to "none" for each consumer and handle InvalidOffsetException 
that might be throw by poll() or position().

  was:
Streams uses three different consumers internally. The main consumer, as well 
as one consumer for state restore (restore consumer, also used by StandbyTasks) 
and a consumer for global state (used by GlobalThreadThread). While main 
consumer handles InvalidOffsetException correctly, restore and global consumer 
don't. Currently, they rely on auto.offset.reset with default value "latest" -- 
thus, if there is an InvalidOffsetException we just jump to the end of the 
changelog topic instead of proper handler this case.

An InvalidOffsetException can occur for two cases:
# An Kafka Streams application is offline for some time and on restart it reads 
it local offset file. This offset file might contain offsets that are not valid 
anymore as the log got compacted in between.
# Even if we have valid offset and we do a seek, log compaction can actually 
tick an in the background at any point and could make our offset invalid -- 
this is a rather rare race conditions but we need to handle it anyway

For both cases, we can apply the same strategy: wipe out the local RocksDB, 
seekToBeginning, and recreate the store from scratch.


> Restore and global consumer should not use auto.offset.reset
> ------------------------------------------------------------
>
>                 Key: KAFKA-6121
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6121
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Matthias J. Sax
>              Labels: streams-exception-handling
>             Fix For: 1.1.0, 1.0.1
>
>
> Streams uses three different consumers internally. The main consumer, as well 
> as one consumer for state restore (restore consumer, also used by 
> StandbyTasks) and a consumer for global state (used by GlobalThreadThread). 
> While main consumer handles InvalidOffsetException correctly, restore and 
> global consumer don't. Currently, they rely on auto.offset.reset with default 
> value "latest" -- thus, if there is an InvalidOffsetException we just jump to 
> the end of the changelog topic instead of proper handler this case.
> An InvalidOffsetException can occur for two cases:
> # An Kafka Streams application is offline for some time and on restart it 
> reads it local offset file. This offset file might contain offsets that are 
> not valid anymore as the log got compacted in between.
> # Even if we have valid offset and we do a seek, log compaction can actually 
> tick an in the background at any point and could make our offset invalid -- 
> this is a rather rare race conditions but we need to handle it anyway
> For both cases, we can apply the same strategy: wipe out the local RocksDB, 
> seekToBeginning, and recreate the store from scratch. Thus, we need to set 
> auto.offset.reset to "none" for each consumer and handle 
> InvalidOffsetException that might be throw by poll() or position().



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to