[ https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Phil Mikhailov updated KAFKA-6822: ---------------------------------- Description: We have a microservices that use Kafka Streams which stuck in initialization of stream topolgy while filling StateStore from Kafka using Kafka consumer. Microservice is build with Kafka Streams 0.10.2.1-cp1 (Confluent 3.2.1) but environment runs Kafka cluster 1.0.0 (Confluent 4.0.0). We reproduced this problem several time by restarting microservices and eventually had to reset the stream offsets to beginning in order unblock microservices. While investigating this problem more deeply we found out that StateStore (0.10.2.1) stuck in loading data using {{ProcessorStateManager}}. It uses KafkaConsumer (0.10.2.1) to fill the store and it calculates offsets like this: Fetcher:524 {code:java} long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; {code} Get the latest offset from records (which were got from {{poll}}) plus 1. So the next offset is estimated. In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch: {code:java} long nextOffset = partitionRecords.nextFetchOffset; {code} It returns the actual next offset but not estimated. That said, we had a situation when StateStore (0.10.2.1) stuck in loading data. The reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept spinning in consumer loop 'cause the following condition never happened: {code:java} } else if (restoreConsumer.position(storePartition) == endOffset) { break; } {code} We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of compaction. Or there is inconsistency between offsets calculation between 0.10.2.1 and 1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker. was: We faced the problem when StateStore (0.10.2.1) stuck in loading data during start of microservice. Our configuration is Kafka 1.0.0 but microservices are built with Kafka Streams 0.10.2.1. We had to reset the stream offsets in order unblock microservices 'cause restarts didn't help. We faced the problem only once and didn't have a chance to reproduce it, so we're sorry in advance for maybe poor explanations. Below are details that we've managed to collect that time: Kafka consumer 0.10.2.1 calculates offsets like this: Fetcher:524 {code:java} long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; {code} Get the latest offset from records (which were got from {{poll}}) plus 1. So the next offset is estimated. In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch: {code:java} long nextOffset = partitionRecords.nextFetchOffset; {code} It returns the actual next offset but not estimated. That said, we had a situation when StateStore (0.10.2.1) stuck in loading data. The reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept spinning in consumer loop 'cause this condition never happened: {code:java} } else if (restoreConsumer.position(storePartition) == endOffset) { break; } {code} We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of compaction. Or there is inconsistency between offsets calculation between 0.10.2.1 and 1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker. > Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0 > ------------------------------------------------------------------- > > Key: KAFKA-6822 > URL: https://issues.apache.org/jira/browse/KAFKA-6822 > Project: Kafka > Issue Type: Bug > Components: consumer, streams > Reporter: Phil Mikhailov > Priority: Major > > We have a microservices that use Kafka Streams which stuck in initialization > of stream topolgy while filling StateStore from Kafka using Kafka consumer. > Microservice is build with Kafka Streams 0.10.2.1-cp1 (Confluent 3.2.1) but > environment runs Kafka cluster 1.0.0 (Confluent 4.0.0). > We reproduced this problem several time by restarting microservices and > eventually had to reset the stream offsets to beginning in order unblock > microservices. > While investigating this problem more deeply we found out that StateStore > (0.10.2.1) stuck in loading data using {{ProcessorStateManager}}. It uses > KafkaConsumer (0.10.2.1) to fill the store and it calculates offsets like > this: > Fetcher:524 > {code:java} > long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1; > {code} > Get the latest offset from records (which were got from {{poll}}) plus 1. > So the next offset is estimated. > In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch: > {code:java} > long nextOffset = partitionRecords.nextFetchOffset; > {code} > It returns the actual next offset but not estimated. > That said, we had a situation when StateStore (0.10.2.1) stuck in loading > data. The reason was in {{ProcessorStateManager.restoreActiveState:245}} > which kept spinning in consumer loop 'cause the following condition never > happened: > {code:java} > } else if (restoreConsumer.position(storePartition) == endOffset) { > break; > } > {code} > > We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of > compaction. > Or there is inconsistency between offsets calculation between 0.10.2.1 and > 1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker. -- This message was sent by Atlassian JIRA (v7.6.3#76005)