[ https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Phil Mikhailov updated KAFKA-6822: ---------------------------------- Description: Kafka consumer 0.10.2.1 calculates offsets like this: Fetcher:524 {code:java} long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; {code} Get the latest offset from records (which were got from {{poll}}) plus 1. So the next offset is estimated. In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch: {code:java} long nextOffset = partitionRecords.nextFetchOffset; {code} It returns the actual next offset but not estimated. This is the reason why {{ProcessorStateManager.restoreActiveState:245}} keeps spinning in consumer loop 'cause this condition will never happen: {code:java} } else if (restoreConsumer.position(storePartition) == endOffset) { break; } {code} was: Kafka consumer 0.10.2.1 calculates offsets like this: Fetcher:524 {code:java} long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; {code} Get the latest offset from records (which were got from {{poll}}) plus 1. So the next offset is estimated. In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch: {code:java} long nextOffset = partitionRecords.nextFetchOffset; {code} It returns the actual next offset but not estimated. This is the reason why {{ProcessorStateManager.restoreActiveState:245}} keeps spinning in consumer loop 'cause this condition will never happen: {code:java} } else if (restoreConsumer.position(storePartition) == endOffset) { break; } {code} > Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0 > ------------------------------------------------------------------- > > Key: KAFKA-6822 > URL: https://issues.apache.org/jira/browse/KAFKA-6822 > Project: Kafka > Issue Type: Bug > Components: consumer > Reporter: Phil Mikhailov > Priority: Major > > Kafka consumer 0.10.2.1 calculates offsets like this: > Fetcher:524 > {code:java} > long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; > {code} > Get the latest offset from records (which were got from {{poll}}) plus 1. > So the next offset is estimated. > In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch: > {code:java} > long nextOffset = partitionRecords.nextFetchOffset; > {code} > It returns the actual next offset but not estimated. > This is the reason why {{ProcessorStateManager.restoreActiveState:245}} keeps > spinning in consumer loop 'cause this condition will never happen: > {code:java} > } else if (restoreConsumer.position(storePartition) == endOffset) { > break; > } > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)