[ 
https://issues.apache.org/jira/browse/KAFKA-6822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Phil Mikhailov updated KAFKA-6822:
----------------------------------
    Description: 
We have a microservices that use Kafka Streams which stuck in initialization of 
stream topolgy while filling StateStore from Kafka using Kafka consumer. 
Microservice is build with Kafka Streams 0.10.2.1-cp1 (Confluent 3.2.1) but 
environment runs Kafka cluster 1.0.0 (Confluent 4.0.0). 

We reproduced this problem several time by restarting microservices and 
eventually had to reset the stream offsets to beginning in order unblock 
microservices.


While investigating this problem more deeply we found out that  StateStore 
(0.10.2.1) stuck in loading data using {{ProcessorStateManager}}. It uses 
KafkaConsumer (0.10.2.1) to fill the store and it calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

That said, we had a situation when StateStore (0.10.2.1) stuck in loading data. 
The reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
spinning in consumer loop 'cause the following condition never happened:
{code:java}
       } else if (restoreConsumer.position(storePartition) == endOffset) {
           break;
       }
{code}
 
We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction. 
 Or there is inconsistency between offsets calculation between 0.10.2.1 and 
1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.

  was:
We faced the problem when StateStore (0.10.2.1) stuck in loading data during 
start of microservice.
 Our configuration is Kafka 1.0.0 but microservices are built with Kafka 
Streams 0.10.2.1.
 We had to reset the stream offsets in order unblock microservices 'cause 
restarts didn't help.

We faced the problem only once and didn't have a chance to reproduce it, so 
we're sorry in advance for maybe poor explanations.

Below are details that we've managed to collect that time:

Kafka consumer 0.10.2.1 calculates offsets like this:
 Fetcher:524
{code:java}
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
{code}
Get the latest offset from records (which were got from {{poll}}) plus 1.
 So the next offset is estimated.

In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
{code:java}
long nextOffset = partitionRecords.nextFetchOffset;
{code}
It returns the actual next offset but not estimated.

 

That said, we had a situation when StateStore (0.10.2.1) stuck in loading data. 
The reason was in {{ProcessorStateManager.restoreActiveState:245}} which kept 
spinning in consumer loop 'cause this condition never happened:
{code:java}
       } else if (restoreConsumer.position(storePartition) == endOffset) {
           break;
       }
{code}
 

We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
compaction. 
 Or there is inconsistency between offsets calculation between 0.10.2.1 and 
1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.


> Kafka Consumer 0.10.2.1 can not normally read data from Kafka 1.0.0
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6822
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6822
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, streams
>            Reporter: Phil Mikhailov
>            Priority: Major
>
> We have a microservices that use Kafka Streams which stuck in initialization 
> of stream topolgy while filling StateStore from Kafka using Kafka consumer. 
> Microservice is build with Kafka Streams 0.10.2.1-cp1 (Confluent 3.2.1) but 
> environment runs Kafka cluster 1.0.0 (Confluent 4.0.0). 
> We reproduced this problem several time by restarting microservices and 
> eventually had to reset the stream offsets to beginning in order unblock 
> microservices.
> While investigating this problem more deeply we found out that  StateStore 
> (0.10.2.1) stuck in loading data using {{ProcessorStateManager}}. It uses 
> KafkaConsumer (0.10.2.1) to fill the store and it calculates offsets like 
> this:
>  Fetcher:524
> {code:java}
> long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
> {code}
> Get the latest offset from records (which were got from {{poll}}) plus 1.
>  So the next offset is estimated.
> In Kafka 1.0.0 otherwise broker explicitly provides the next offset to fetch:
> {code:java}
> long nextOffset = partitionRecords.nextFetchOffset;
> {code}
> It returns the actual next offset but not estimated.
> That said, we had a situation when StateStore (0.10.2.1) stuck in loading 
> data. The reason was in {{ProcessorStateManager.restoreActiveState:245}} 
> which kept spinning in consumer loop 'cause the following condition never 
> happened:
> {code:java}
>        } else if (restoreConsumer.position(storePartition) == endOffset) {
>            break;
>        }
> {code}
>  
> We assume that consumer 0.10.2.1 estimates endoffset incorrectly in a case of 
> compaction. 
>  Or there is inconsistency between offsets calculation between 0.10.2.1 and 
> 1.0.0 which doesn't allow to use 0.10.2.1 consumer with 1.0.0 broker.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to