Bart Vercammen created KAFKA-6000:
-------------------------------------

             Summary: streams 0.10.2.1 - kafka 0.11.0.1 state restore not 
working
                 Key: KAFKA-6000
                 URL: https://issues.apache.org/jira/browse/KAFKA-6000
             Project: Kafka
          Issue Type: Bug
          Components: core, streams
    Affects Versions: 0.11.0.0, 0.10.2.1
            Reporter: Bart Vercammen
            Priority: Blocker


Potential interop issue between Kafka Streams (0.10.2.1) and Kafka (0.11.0.1)

{noformat}
11:24:16.416 [StreamThread-3] DEBUG rocessorStateManager - task [0_2] 
Registering state store lateststate to its state manager 
11:24:16.472 [StreamThread-3] TRACE rocessorStateManager - task [0_2] Restoring 
state store lateststate from changelog topic scratch.lateststate.dsh 
11:24:16.472 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Resetting offset for 
partition scratch.lateststate.dsh-2 to latest offset. 
11:24:16.472 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Partition 
scratch.lateststate.dsh-2 is unknown for fetching offset, wait for metadata 
refresh 
11:24:16.474 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Sending 
ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, 
partitionTimestamps={scratch.lateststate.dsh-2=-1}, minVersion=0) to broker 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.476 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 
ListOffsetResponse 
{responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=1773763}]}]}
 from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.476 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Handling 
ListOffsetResponse response for scratch.lateststate.dsh-2. Fetched offset 
1773763, timestamp -1 
11:24:16.477 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Resetting offset for 
partition scratch.lateststate.dsh-2 to earliest offset. 
11:24:16.478 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Sending 
ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, 
partitionTimestamps={scratch.lateststate.dsh-2=-2}, minVersion=0) to broker 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.480 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 
ListOffsetResponse 
{responses=[{topic=scratch.lateststate.dsh,partition_responses=[{partition=2,error_code=0,timestamp=-1,offset=0}]}]}
 from broker broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.481 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Handling 
ListOffsetResponse response for scratch.lateststate.dsh-2. Fetched offset 0, 
timestamp -1 
11:24:16.483 [StreamThread-3] DEBUG rocessorStateManager - restoring partition 
scratch.lateststate.dsh-2 from offset 0 to endOffset 1773763 
11:24:16.484 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch request 
for partition scratch.lateststate.dsh-2 at offset 0 to node 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.485 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
partitions [scratch.lateststate.dsh-2] to broker 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.486 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
partition scratch.lateststate.dsh-2 because there is an in-flight request to 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.490 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
record for partition scratch.lateststate.dsh-2 with offset 0 to buffered record 
list 
11:24:16.492 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 3 records 
in fetch response for partition scratch.lateststate.dsh-2 with offset 0 
11:24:16.493 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Returning fetched 
records at offset 0 for assigned partition scratch.lateststate.dsh-2 and update 
position to 1586527 
11:24:16.494 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Ignoring fetched 
records for scratch.lateststate.dsh-2 at offset 0 since the current position is 
1586527 
11:24:16.496 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch request 
for partition scratch.lateststate.dsh-2 at offset 1586527 to node 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.496 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
partitions [scratch.lateststate.dsh-2] to broker 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.498 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
partition scratch.lateststate.dsh-2 because there is an in-flight request to 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.499 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
record for partition scratch.lateststate.dsh-2 with offset 1586527 to buffered 
record list 
11:24:16.500 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records 
in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 
11:24:16.501 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch request 
for partition scratch.lateststate.dsh-2 at offset 1586527 to node 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.502 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
partitions [scratch.lateststate.dsh-2] to broker 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.511 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
partition scratch.lateststate.dsh-2 because there is an in-flight request to 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.512 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
record for partition scratch.lateststate.dsh-2 with offset 1586527 to buffered 
record list 
11:24:16.512 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records 
in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 
11:24:16.513 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch request 
for partition scratch.lateststate.dsh-2 at offset 1586527 to node 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.515 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
partitions [scratch.lateststate.dsh-2] to broker 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.517 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
partition scratch.lateststate.dsh-2 because there is an in-flight request to 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.518 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
record for partition scratch.lateststate.dsh-2 with offset 1586527 to buffered 
record list 
11:24:16.519 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records 
in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 
11:24:16.520 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Added fetch request 
for partition scratch.lateststate.dsh-2 at offset 1586527 to node 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.520 [StreamThread-3] DEBUG  o.a.k.c.c.i.Fetcher - Sending fetch for 
partitions [scratch.lateststate.dsh-2] to broker 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.522 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Skipping fetch for 
partition scratch.lateststate.dsh-2 because there is an in-flight request to 
broker-1.tt.kafka.marathon.mesos:9091 (id: 1002 rack: null) 
11:24:16.523 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Adding fetched 
record for partition scratch.lateststate.dsh-2 with offset 1586527 to buffered 
record list 
11:24:16.523 [StreamThread-3] TRACE  o.a.k.c.c.i.Fetcher - Received 0 records 
in fetch response for partition scratch.lateststate.dsh-2 with offset 1586527 
{noformat}

In this setup, I have 5 Kafka brokers, running 0.11.0.1 (with SSL) and a 
KafkaStreams application running version 0.10.2.1.  The streams application 
uses an underlying statestore (`scratch.lateststate.dsh`).  The problem I've 
seen is that when the kafka streams application (re)starts when quite some data 
is already present in the state-stores, it does not restore the state.  
KafkaStreams remains in `REBALANCING` state, and never exits the 
`restoreActiveState` function in `ProcessorStateManager`.

Now, what I also noticed is that sometimes the state-restore seems to work when 
the number of records in the changelog-topic is below 100K (or something like 
that).  I've seen a successful restore when the restore-consumer-lag was below 
100K records.

When running the exact same application on a 0.10.2.1 Kafka cluster the issue 
never occures.  It only happens when I run the 0.10.2.1 KafkaStreams 
application against a 0.11 Kafka cluster.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to