[
https://issues.apache.org/jira/browse/KAFKA-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Damien Gasparina updated KAFKA-14302:
-------------------------------------
Description:
If a store, with a changelog topic, has been fully emptied, it could generate
infinite probing rebalance.
The scenario is the following:
* A Kafka Streams application have a store with a changelog
* Many entries are pushed into the changelog, thus the Log end Offset is high,
let's say 20,000
* Then, the store got emptied, either due to data retention (windowing) or
tombstone
* Then an instance of the application is restarted
* It restores the store from the changelog, but does not write a checkpoint
file as there are no data pushed at all
* As there are no checkpoint entries, this instance specify a taskOffsetSums
with offset set to 0 in the subscriptionUserData
* The group leader, during the assignment, then compute a lag of 20,000 (end
offsets - task offset), which is greater than the default acceptable lag, thus
decide to schedule a probing rebalance
* In ther next probing rebalance, nothing changed, so... new probing rebalance
I was able to reproduce locally with a simple topology:
{code:java}
var table = streamsBuilder.stream("table");
streamsBuilder
.stream("stream")
.join(table, (eSt, eTb) -> eSt.toString() + eTb.toString(),
JoinWindows.of(Duration.ofSeconds(5)))
.to("output");{code}
Due to this issue, application having an empty changelog are experiencing
frequent rebalance:
!image-2022-10-14-12-04-01-190.png!
With assignments similar to:
{code:java}
[hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3] INFO
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor -
stream-thread
[hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3-consumer]
Assigned tasks [0_5, 0_4, 0_3, 0_2, 0_1, 0_0] including stateful [0_5, 0_4,
0_3, 0_2, 0_1, 0_0] to clients as:
d0e2d556-2587-48e8-b9ab-43a4e8207be6=[activeTasks: ([]) standbyTasks: ([0_0,
0_1, 0_2, 0_3, 0_4, 0_5])]
8323d214-4c56-470f-bace-e4291cdf10eb=[activeTasks: ([0_0, 0_1, 0_2, 0_3, 0_4,
0_5]) standbyTasks: ([])].{code}
was:
If a store, with a changelog topic, has been fully emptied, it could generate
infinite probing rebalance.
The scenario is the following:
* A Kafka Streams application have a store with a changelog
* Many entries are pushed into the changelog, thus the Log end Offset is high,
let's say 20,000
* Then, the store got emptied, either due to data retention (windowing) or
tombstone
* Then an instance of the application is restarted
* It restores the store from the changelog, but does not write a checkpoint
file as there are no data pushed at all
* As there are no checkpoint entries, this instance specify a taskOffsetSums
with offset set to 0 in the subscriptionUserData
* The group leader, during the assignment, then compute a lag of 20,000 (end
offsets - task offset), which is greater than the default acceptable lag, thus
decide to schedule a probing rebalance
* In ther next probing rebalance, nothing changed, so... new probing rebalance
I was able to reproduce locally with a simple topology:
{code:java}
var table = streamsBuilder.stream("table");
streamsBuilder
.stream("stream")
.join(table, (eSt, eTb) -> eSt.toString() + eTb.toString(),
JoinWindows.of(Duration.ofSeconds(5)))
.to("output");{code}
Due to this issue, application having an empty changelog are experiencing
frequent rebalance:
!image-2022-10-14-12-04-01-190.png!
> Infinite probing rebalance if a changelog topic got emptied
> -----------------------------------------------------------
>
> Key: KAFKA-14302
> URL: https://issues.apache.org/jira/browse/KAFKA-14302
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.3.1
> Reporter: Damien Gasparina
> Priority: Major
> Attachments: image-2022-10-14-12-04-01-190.png
>
>
> If a store, with a changelog topic, has been fully emptied, it could generate
> infinite probing rebalance.
>
> The scenario is the following:
> * A Kafka Streams application have a store with a changelog
> * Many entries are pushed into the changelog, thus the Log end Offset is
> high, let's say 20,000
> * Then, the store got emptied, either due to data retention (windowing) or
> tombstone
> * Then an instance of the application is restarted
> * It restores the store from the changelog, but does not write a checkpoint
> file as there are no data pushed at all
> * As there are no checkpoint entries, this instance specify a taskOffsetSums
> with offset set to 0 in the subscriptionUserData
> * The group leader, during the assignment, then compute a lag of 20,000 (end
> offsets - task offset), which is greater than the default acceptable lag,
> thus decide to schedule a probing rebalance
> * In ther next probing rebalance, nothing changed, so... new probing
> rebalance
>
> I was able to reproduce locally with a simple topology:
>
> {code:java}
> var table = streamsBuilder.stream("table");
> streamsBuilder
> .stream("stream")
> .join(table, (eSt, eTb) -> eSt.toString() + eTb.toString(),
> JoinWindows.of(Duration.ofSeconds(5)))
> .to("output");{code}
>
>
>
> Due to this issue, application having an empty changelog are experiencing
> frequent rebalance:
> !image-2022-10-14-12-04-01-190.png!
>
> With assignments similar to:
> {code:java}
> [hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3] INFO
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor -
> stream-thread
> [hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3-consumer]
> Assigned tasks [0_5, 0_4, 0_3, 0_2, 0_1, 0_0] including stateful [0_5, 0_4,
> 0_3, 0_2, 0_1, 0_0] to clients as:
> d0e2d556-2587-48e8-b9ab-43a4e8207be6=[activeTasks: ([]) standbyTasks: ([0_0,
> 0_1, 0_2, 0_3, 0_4, 0_5])]
> 8323d214-4c56-470f-bace-e4291cdf10eb=[activeTasks: ([0_0, 0_1, 0_2, 0_3, 0_4,
> 0_5]) standbyTasks: ([])].{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)