[
https://issues.apache.org/jira/browse/STORM-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16014195#comment-16014195
]
Stig Rohde Døssing commented on STORM-2514:
-------------------------------------------
Sorry, I should have explained. The KafkaConsumer supports subscriptions via
two APIs: subscribe() and assign(). subscribe() lets Kafka deal with assigning
partitions to each consumer, assign() lets the calling code assign them
manually. When I asked if you were using automatic assignment, I meant which of
these APIs your spout was using. The default is subscribe().
I think the subscribe() API doesn't really work when there are multiple
consumers in a thread. As far as I can tell, the KafkaConsumer will initiate a
partition reassignment whenever a new consumer joins the consumer group. The
rebalance seems to require all active consumers to call poll(), and poll()
doesn't return until the rebalance is complete.
The problem seems to be that when we run multiple spout tasks in one executor,
we end up triggering a rebalance when each consumer joins the group. Since the
other consumers in the executor can't call poll until the rebalance is
complete, because the newly joined consumer is blocking the thread in poll(),
they are booted from the list of active consumers. For the version of Kafka
you're running, this will happen once the session.timeout.ms expires (default
30 seconds), at which point the blocking poll() returns. At this point Kafka
considers only the newly joined consumer to be alive. When the other tasks in
the executor next call poll, they try to rejoin the group, which again causes a
rebalance and and reassignment.
The log you posted shows partition reassignment to a new task id every 30
seconds for each executor (look for "Partitions reassignment."). I don't think
the subscribe() API was designed with the expectation that one thread might be
running multiple consumers.
I attached some test code that starts two KafkaConsumers and repeatedly polls
with them. It seems to demonstrate this behavior where the subscribe() API
works poorly when there are multiple consumers in a thread. The assignments
keep flip flopping between the consumers. With one consumer per thread, or
manual assignment through the assign() API, it works fine.
Here's a sample print
{code}
176 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version :
0.10.0.0
176 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId :
b8642491e78c5a13
176 [main] INFO com.mycompany.scratch.NewClass - Polling from c1
301 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
- Discovered coordinator DESKTOP-AGC8TKM:9092 (id: 2147483647 rack: null) for
group test.
302 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
- Revoking previously assigned partitions [] for group test
302 [main] INFO com.mycompany.scratch.NewClass - Partitions [] revoked for c1
302 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
- (Re-)joining group test
309 [main] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator
- Successfully joined group test with generation 42
309 [main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator
- Setting newly assigned partitions [topic-0, topic-1] for group test
309 [main] INFO com.mycompany.scratch.NewClass - Partitions [topic-0, topic-1]
assigned to c1
2185 [main] INFO com.mycompany.scratch.NewClass - Polling from c2
2290 [main] INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered
coordinator DESKTOP-AGC8TKM:9092 (id: 2147483647 rack: null) for group test.
2292 [main] INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Revoking
previously assigned partitions [] for group test
2292 [main] INFO com.mycompany.scratch.NewClass - Partitions [] revoked for c2
2292 [main] INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining
group test
30321 [main] INFO
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Successfully
joined group test with generation 43
30321 [main] INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - Setting newly
assigned partitions [topic-0, topic-1] for group test
30321 [main] INFO com.mycompany.scratch.NewClass - Partitions [topic-0,
topic-1] assigned to c2
35340 [main] INFO com.mycompany.scratch.NewClass - Polling from c1
{code}
During the hang in poll, the consumer group tool shows the group as rebalancing
{code}
$ ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe
--group test
Consumer group `test` is rebalancing.
{code}
I think we can work around this by switching to manual partition assignment (as
the default?), since rebalances don't happen then. We already have support for
this with
https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java.
I'm not sure I see a good reason we'd need Kafka to manage assignments in any
case.
[[email protected]] can you try to set the Subscription to the class I
mentioned above in the KafkaSpoutConfig, and see if it helps?
> Incorrect logs for mapping between Kafka partitions and task IDs
> ----------------------------------------------------------------
>
> Key: STORM-2514
> URL: https://issues.apache.org/jira/browse/STORM-2514
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Reporter: Srishty Agrawal
> Attachments: NewClass.java, worker.log
>
>
> While working on
> [STORM-2506|https://issues.apache.org/jira/browse/STORM-2506], the worker
> logs were generated with debug mode on. The information printed about mapping
> between Task IDs and kafka partitions was contradictory to my assumptions. I
> ran a topology which used KafkaSpout from the storm-kafka-client module, it
> had a parallelism hint of 2 (number of executors) and a total of 16 tasks.
> The log lines mentioned below show assigned mapping between executors and
> kafka partitions:
> {noformat}
> o.a.k.c.c.i.ConsumerCoordinator Thread-12-kafkaspout-executor[3 10] [INFO]
> Setting newly assigned partitions [8topic-4, 8topic-6, 8topic-5, 8topic-7]
> for group kafkaSpoutTestGroup
> o.a.s.k.s.KafkaSpout Thread-12-kafkaspout-executor[3 10] [INFO] Partitions
> reassignment. [taskID=10, consumer-group=kafkaSpoutTestGroup,
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@108e79ce,
> topic-partitions=[8topic-4, 8topic-6, 8topic-5, 8topic-7]]
> o.a.k.c.c.i.ConsumerCoordinator Thread-8-kafkaspout-executor[11 18] [INFO]
> Setting newly assigned partitions [8topic-2, 8topic-1, 8topic-3, 8topic-0]
> for group kafkaSpoutTestGroup
> o.a.s.k.s.KafkaSpout Thread-8-kafkaspout-executor[11 18] [INFO] Partitions
> reassignment. [taskID=15, consumer-group=kafkaSpoutTestGroup,
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2dc37126,
> topic-partitions=[8topic-2, 8topic-1, 8topic-3, 8topic-0]]
> {noformat}
> It is evident that only tasks (with ID 3, 4, 5, 6, 7, 8, 9, 10) in Executor1
> (3 10) will be reading from kafka partitions 4, 5, 6 and 7. Similarly, tasks
> in Executor2 (11 18) will be reading from kafka partitions 0, 1, 2 and 3.
> These log lines are being printed by Tasks with IDs 10 and 15 in respective
> executors.
> Logs which emit individual messages do not abide by the above assumption. For
> example in the log mentioned below, Task ID 3 (added code, as a part of
> debugging STORM-2506, to print the Task ID right next to component ID) which
> runs on Executor1 reads from partition 2 (the second value inside the square
> brackets), instead of 4, 5, 6 or 7.
> {noformat}Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 3
> default [8topic, 2, 0, null, 1]{noformat}
> This behavior has been summarized in the table below :
> {noformat}
> Task IDs ------- 3, 4, 7, 8, 9, 11, 15, 18 ------------ Partitions 0, 1, 2, 3
> Task IDs ------- 5, 6, 10, 12, 13, 14, 16, 17 --------- Partition 4, 5, 6, 7
> {noformat}
> [You can find the relevant parts of log file
> here.|https://gist.github.com/srishtyagrawal/f7c53db6b8391e2c3bd522afc93b5351]
>
> Am I misunderstanding something here? Do tasks {{5, 6, 10, 12, 13, 14, 16,
> 17}} correspond to executor1 and {{3, 4, 7, 8, 9, 11, 15, 18}} correspond to
> executor2? Are (3 10) not the starting and ending task IDs in Executor1?
> Another interesting thing to note is that, Task IDs 10 and 15 are always
> reading from the partitions they claimed to be reading from (while setting
> partition assignments).
> If my assumptions are correct, there is a bug in the way the mapping
> information is being/passed to worker logs. If not, we need to make changes
> in our docs.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)