Chris Pettitt created KAFKA-8831:
------------------------------------
Summary: Joining a new instance sometimes does not cause
rebalancing
Key: KAFKA-8831
URL: https://issues.apache.org/jira/browse/KAFKA-8831
Project: Kafka
Issue Type: Bug
Reporter: Chris Pettitt
Assignee: Chris Pettitt
Attachments: StandbyTaskTest.java
See log below. The second instance joins a bit after the first instance
(~250ms). The group coordinator says it is going to rebalance but nothing
happens. The first instance gets all partitions (2).
```
[2019-08-23 17:12:05,756] INFO [Consumer clientId=consumer-1,
groupId=consumerApp] Subscribed to topic(s): output-topic
(org.apache.kafka.clients.consumer.KafkaConsumer)[2019-08-23 17:12:05,756] INFO
[Consumer clientId=consumer-1, groupId=consumerApp] Subscribed to topic(s):
output-topic (org.apache.kafka.clients.consumer.KafkaConsumer)[2019-08-23
17:12:05,757] INFO [Consumer
clientId=streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-consumer,
groupId=streamsApp] Discovered group coordinator localhost:57756 (id:
2147483647 rack: null)
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)[2019-08-23
17:12:05,760] INFO [Consumer clientId=consumer-1, groupId=consumerApp]
Discovered group coordinator localhost:57756 (id: 2147483647 rack: null)
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)[2019-08-23
17:12:05,760] INFO [Consumer
clientId=streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-consumer,
groupId=streamsApp] (Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)[2019-08-23
17:12:05,761] INFO [Consumer clientId=consumer-1, groupId=consumerApp]
(Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)[2019-08-23
17:12:05,781] INFO [Consumer
clientId=streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-consumer,
groupId=streamsApp] (Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)[2019-08-23
17:12:05,781] INFO [Consumer clientId=consumer-1, groupId=consumerApp]
(Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)[2019-08-23
17:12:05,788] INFO [GroupCoordinator 0]: Preparing to rebalance group
streamsApp in state PreparingRebalance with old generation 0
(__consumer_offsets-6) (reason: Adding new member
streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-consumer-35501476-e96b-48b9-90d2-e98716e7be56
with group instanceid None)
(kafka.coordinator.group.GroupCoordinator)[2019-08-23 17:12:05,788] INFO
[GroupCoordinator 0]: Preparing to rebalance group consumerApp in state
PreparingRebalance with old generation 0 (__consumer_offsets-5) (reason: Adding
new member consumer-1-afda303e-7b9b-43e3-97a2-e689c10b7fad with group
instanceid None) (kafka.coordinator.group.GroupCoordinator)[2019-08-23
17:12:05,793] INFO [GroupCoordinator 0]: Stabilized group streamsApp generation
1 (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)[2019-08-23
17:12:05,795] INFO [GroupCoordinator 0]: Stabilized group consumerApp
generation 1 (__consumer_offsets-5)
(kafka.coordinator.group.GroupCoordinator)[2019-08-23 17:12:05,798] WARN Unable
to assign 1 of 1 standby tasks for task [0_0]. There is not enough available
capacity. You should increase the number of threads and/or application
instances to maintain the requested number of standby replicas.
(org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor)[2019-08-23
17:12:05,798] WARN Unable to assign 1 of 1 standby tasks for task [0_1]. There
is not enough available capacity. You should increase the number of threads
and/or application instances to maintain the requested number of standby
replicas.
(org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor)[2019-08-23
17:12:05,798] INFO stream-thread
[streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-consumer]
Assigned tasks to clients as
\{581aeca8-9139-4575-8b05-a72a128e2645=[activeTasks: ([0_0, 0_1]) standbyTasks:
([]) assignedTasks: ([0_0, 0_1]) prevActiveTasks: ([]) prevStandbyTasks: ([])
prevAssignedTasks: ([]) capacity: 1]}.
(org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor)[2019-08-23
17:12:05,799] INFO [GroupCoordinator 0]: Assignment received from leader for
group consumerApp for generation 1
(kafka.coordinator.group.GroupCoordinator)[2019-08-23 17:12:05,800] INFO
[GroupCoordinator 0]: Assignment received from leader for group streamsApp for
generation 1 (kafka.coordinator.group.GroupCoordinator)[2019-08-23
17:12:05,815] INFO [Consumer
clientId=streamsApp-1e4ee8e2-e5fb-4571-a25a-0084b3c0a4ca-StreamThread-1-consumer,
groupId=streamsApp] Discovered group coordinator localhost:57756 (id:
2147483647 rack: null)
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)[2019-08-23
17:12:05,817] INFO [Consumer clientId=consumer-1, groupId=consumerApp]
Successfully joined group with generation 1
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)[2019-08-23
17:12:05,817] INFO [Consumer
clientId=streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-consumer,
groupId=streamsApp] Successfully joined group with generation 1
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)[2019-08-23
17:12:05,817] INFO [Consumer
clientId=streamsApp-1e4ee8e2-e5fb-4571-a25a-0084b3c0a4ca-StreamThread-1-consumer,
groupId=streamsApp] (Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)[2019-08-23
17:12:05,821] INFO [Consumer
clientId=streamsApp-1e4ee8e2-e5fb-4571-a25a-0084b3c0a4ca-StreamThread-1-consumer,
groupId=streamsApp] (Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)[2019-08-23
17:12:05,823] INFO [GroupCoordinator 0]: Preparing to rebalance group
streamsApp in state PreparingRebalance with old generation 1
(__consumer_offsets-6) (reason: Adding new member
streamsApp-1e4ee8e2-e5fb-4571-a25a-0084b3c0a4ca-StreamThread-1-consumer-887ce0e4-fc16-4287-b0ce-cc6ab1301d97
with group instanceid None)
(kafka.coordinator.group.GroupCoordinator)[2019-08-23 17:12:05,824] INFO
[Consumer clientId=consumer-1, groupId=consumerApp] Adding newly assigned
partitions: output-topic-0
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)[2019-08-23
17:12:05,824] INFO [Consumer
clientId=streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-consumer,
groupId=streamsApp] Adding newly assigned partitions: input-topic-0,
input-topic-1
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)[2019-08-23
17:12:05,824] INFO stream-thread
[streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1] State
transition from STARTING to PARTITIONS_ASSIGNED
(org.apache.kafka.streams.processor.internals.StreamThread)[2019-08-23
17:12:05,836] INFO [Consumer clientId=consumer-1, groupId=consumerApp] Found no
committed offset for partition output-topic-0
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)[2019-08-23
17:12:05,839] INFO stream-thread
[streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1] partition
assignment took 15 ms. current active tasks: [0_0, 0_1] current standby tasks:
[] previous active tasks: []
(org.apache.kafka.streams.processor.internals.StreamThread)[2019-08-23
17:12:05,840] INFO [Consumer
clientId=streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-consumer,
groupId=streamsApp] Found no committed offset for partition input-topic-0
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)[2019-08-23
17:12:05,840] INFO [Consumer
clientId=streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-consumer,
groupId=streamsApp] Found no committed offset for partition input-topic-1
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)[2019-08-23
17:12:05,851] INFO [Consumer clientId=consumer-1, groupId=consumerApp]
Resetting offset for partition output-topic-0 to offset 0.
(org.apache.kafka.clients.consumer.internals.SubscriptionState)[2019-08-23
17:12:05,851] INFO [Consumer
clientId=streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-consumer,
groupId=streamsApp] Resetting offset for partition input-topic-0 to offset 0.
(org.apache.kafka.clients.consumer.internals.SubscriptionState)[2019-08-23
17:12:05,851] INFO [Consumer
clientId=streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-consumer,
groupId=streamsApp] Resetting offset for partition input-topic-1 to offset 0.
(org.apache.kafka.clients.consumer.internals.SubscriptionState)[2019-08-23
17:12:05,854] INFO [Consumer
clientId=streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-consumer,
groupId=streamsApp] Found no committed offset for partition input-topic-0
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)[2019-08-23
17:12:05,944] INFO Opening store source-table in regular mode
(org.apache.kafka.streams.state.internals.RocksDBTimestampedStore)[2019-08-23
17:12:05,948] INFO [Consumer
clientId=streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-consumer,
groupId=streamsApp] Found no committed offset for partition input-topic-1
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)[2019-08-23
17:12:05,960] INFO Opening store source-table in regular mode
(org.apache.kafka.streams.state.internals.RocksDBTimestampedStore)[2019-08-23
17:12:05,967] INFO [Consumer
clientId=streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-restore-consumer,
groupId=null] Unsubscribed all topics or patterns and assigned partitions
(org.apache.kafka.clients.consumer.KafkaConsumer)[2019-08-23 17:12:05,972] INFO
[Consumer
clientId=streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1-restore-consumer,
groupId=null] Unsubscribed all topics or patterns and assigned partitions
(org.apache.kafka.clients.consumer.KafkaConsumer)[2019-08-23 17:12:05,972] INFO
stream-thread [streamsApp-581aeca8-9139-4575-8b05-a72a128e2645-StreamThread-1]
State transition from PARTITIONS_ASSIGNED to RUNNING
(org.apache.kafka.streams.processor.internals.StreamThread)[2019-08-23
17:12:05,972] INFO stream-client
[streamsApp-581aeca8-9139-4575-8b05-a72a128e2645] State transition from
REBALANCING to RUNNING (org.apache.kafka.streams.KafkaStreams)
```
See attached test, which starts one client and then starts another about 250ms
later. This seems to consistently repro the issue for me.
This is blocking my work on KAFKA-8755, so I'm inclined to pick it up
--
This message was sent by Atlassian Jira
(v8.3.2#803003)