Srishty Agrawal created STORM-2514:
--------------------------------------
Summary: Incorrect logs for mapping between Kafka partitions and
task IDs
Key: STORM-2514
URL: https://issues.apache.org/jira/browse/STORM-2514
Project: Apache Storm
Issue Type: Bug
Components: storm-kafka-client
Reporter: Srishty Agrawal
While working on [STORM-2506|https://issues.apache.org/jira/browse/STORM-2506],
the worker logs were generated with debug mode on. The information printed
about mapping between Task IDs and kafka partitions was contradictory to my
assumptions. I ran a topology which used KafkaSpout from the storm-kafka-client
module, it had a parallelism hint of 2 (number of executors) and a total of 16
tasks.
The log lines mentioned below show assigned mapping between executors and kafka
partitions:
{noformat}
o.a.k.c.c.i.ConsumerCoordinator Thread-12-kafkaspout-executor[3 10] [INFO]
Setting newly assigned partitions [8topic-4, 8topic-6, 8topic-5, 8topic-7] for
group kafkaSpoutTestGroup
o.a.s.k.s.KafkaSpout Thread-12-kafkaspout-executor[3 10] [INFO] Partitions
reassignment. [taskID=10, consumer-group=kafkaSpoutTestGroup,
consumer=org.apache.kafka.clients.consumer.KafkaConsumer@108e79ce,
topic-partitions=[8topic-4, 8topic-6, 8topic-5, 8topic-7]]
o.a.k.c.c.i.ConsumerCoordinator Thread-8-kafkaspout-executor[11 18] [INFO]
Setting newly assigned partitions [8topic-2, 8topic-1, 8topic-3, 8topic-0] for
group kafkaSpoutTestGroup
o.a.s.k.s.KafkaSpout Thread-8-kafkaspout-executor[11 18] [INFO] Partitions
reassignment. [taskID=15, consumer-group=kafkaSpoutTestGroup,
consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2dc37126,
topic-partitions=[8topic-2, 8topic-1, 8topic-3, 8topic-0]]
{noformat}
It is evident that all the tasks in Executor1 (3 10) will be reading from kafka
partitions 4, 5, 6 and 7. Similarly, tasks in Executor2 (11 18) will be reading
from kafka partitions 0, 1, 2 and 3. These log lines are being printed by Tasks
with IDs 10 and 15 in respective executors.
Logs which emit individual messages do not abide by the above assumption. For
example in the log mentioned below, Task ID 3 which runs on Executor1 reads
from partition 2 (the second value inside the square brackets), instead of 4,
5, 6 or 7.
{noformat}Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 3
default [8topic, 2, 0, null, 1]{noformat}
This behavior has been summarized in the table below :
{noformat}
Task IDs ------- 3, 4, 7, 8, 9, 11, 15, 18 ------------ Partitions 0, 1, 2, 3
Task IDs ------- 5, 6, 10, 12, 13, 14, 16, 17 --------- Partition 4, 5, 6, 7
{noformat}
[You can find the relevant parts of log file
here.|https://gist.github.com/srishtyagrawal/f7c53db6b8391e2c3bd522afc93b5351]
Am I misunderstanding something here? Do tasks {{5, 6, 10, 12, 13, 14, 16, 17}}
correspond to executor1 and {{3, 4, 7, 8, 9, 11, 15, 18}} correspond to
executor2? Are (3 10) not the starting and ending task IDs in Executor1?
Another interesting thing to note is that, Task IDs 10 and 15 are always
reading from the partitions they claimed to be reading from (while setting
partition assignments).
If my assumptions are correct, there is a bug in the way the mapping
information is being/passed to worker logs. If not, we need to make changes in
our docs.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)