[ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Mikhailov updated KAFKA-6822:
----------------------------------
    Description: 
We have a microservices that use Kafka Streams which stuck in initialization of 
stream topolgy while filling StateStore from Kafka using KafkaConsumer. 
Microservice is build with Kafka Streams 0.10.2.1-cp1 (Confluent 3.2.1) but 
environment runs Kafka cluster 1.0.0 (Confluent 4.0.0). 

We reproduced this problem several times by restarting microservices and 
eventually had to reset the stream offsets to beginning in order unblock 
microservices.


While investigating this problem more deeply we found out that  StateStore 
(0.10.2.1) stuck in loading data using {{ProcessorStateManager}}. It uses 
KafkaConsumer (0.10.2.1) to fill the store and it calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

That said, we had a situation when StateStore (0.10.2.1) stuck in loading data. 
The reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
spinning in consumer loop 'cause the following condition never happened:
{code:java}
       } else if (restoreConsumer.position(storePartition) == endOffset) {
           break;
       }
{code}
 
We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction. 
 Or there is inconsistency between offsets calculation between 0.10.2.1 and 
1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.

  was:
We have a microservices that use Kafka Streams which stuck in initialization of 
stream topolgy while filling StateStore from Kafka using KafkaConsumer. 
Microservice is build with Kafka Streams 0.10.2.1-cp1 (Confluent 3.2.1) but 
environment runs Kafka cluster 1.0.0 (Confluent 4.0.0). 

We reproduced this problem several time by restarting microservices and 
eventually had to reset the stream offsets to beginning in order unblock 
microservices.


While investigating this problem more deeply we found out that  StateStore 
(0.10.2.1) stuck in loading data using {{ProcessorStateManager}}. It uses 
KafkaConsumer (0.10.2.1) to fill the store and it calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

That said, we had a situation when StateStore (0.10.2.1) stuck in loading data. 
The reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
spinning in consumer loop 'cause the following condition never happened:
{code:java}
       } else if (restoreConsumer.position(storePartition) == endOffset) {
           break;
       }
{code}
 
We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction. 
 Or there is inconsistency between offsets calculation between 0.10.2.1 and 
1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.


> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6822
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6822
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, streams
>            Reporter: Phil Mikhailov
>            Priority: Major
>
> We have a microservices that use Kafka Streams which stuck in initialization 
> of stream topolgy while filling StateStore from Kafka using KafkaConsumer. 
> Microservice is build with Kafka Streams 0.10.2.1-cp1 (Confluent 3.2.1) but 
> environment runs Kafka cluster 1.0.0 (Confluent 4.0.0). 
> We reproduced this problem several times by restarting microservices and 
> eventually had to reset the stream offsets to beginning in order unblock 
> microservices.
> While investigating this problem more deeply we found out that  StateStore 
> (0.10.2.1) stuck in loading data using {{ProcessorStateManager}}. It uses 
> KafkaConsumer (0.10.2.1) to fill the store and it calculates offsets like 
> this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
> That said, we had a situation when StateStore (0.10.2.1) stuck in loading 
> data. The reason was in {{ProcessorStateManager.restoreActiveState:245}} 
> which kept spinning in consumer loop 'cause the following condition never 
> happened:
> {code:java}
>        } else if (restoreConsumer.position(storePartition) == endOffset) {
>            break;
>        }
> {code}
>  
> We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
> compaction. 
>  Or there is inconsistency between offsets calculation between 0.10.2.1 and 
> 1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to