[
https://issues.apache.org/jira/browse/STORM-2514?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16013239#comment-16013239
]
Srishty Agrawal commented on STORM-2514:
----------------------------------------
Thanks for the clarification. Please find answers to your questions inline:
*How did you get the task lists you posted?*
{noformat}
Task IDs ------- 3, 4, 7, 8, 9, 11, 15, 18 ------------ Partitions 0, 1, 2, 3
Task IDs ------- 5, 6, 10, 12, 13, 14, 16, 17 --------- Partition 4, 5, 6, 7
{noformat}
The above table has been derived from the worker logs. For instance, if you
search for {{kafkaspout 3}}, which means Task ID 3, you will find results for
messages which are being read from partitions 0, 1, 2 and 3. This table seems
to convey that {noformat} 5, 6, 10, 12, 13, 14, 16, 17 correspond to executor1
and 3, 4, 7, 8, 9, 11, 15, 18 correspond to executor2 {noformat}
If I assume that Executor1 should only be running tasks 3, 4, 5, 6, 7, 8, 9,
10, then only these tasks should be reading from partitions 0, 1, 2 or 3.
*I don't think it's exactly right that all the tasks in executor1 (3 10) will
be reading from topics 4,6,5,7. *
Sorry for the confusion, I meant only tasks in Executor1 should read from the
partitions 4, 5, 6 or 7. I have re-worded the same in the description above. I
agree that there is a possibility that not all the tasks will end up reading
from partitions 4, 5, 6 and 7.
*Each task has its own spout instance (and corresponding KafkaConsumer), so
only task 10 is actually assigned those partitions at the start of the log. I'm
not sure how task 3 is even emitting anything?*
Shouldn’t 3 more tasks (from Executor1) apart from task ID 10 be assigned to
read from partitions 4, 5, 6 and 7. If there are more tasks than the number of
partitions there should be one to one mapping between tasks and partitions is
what I remember reading in the docs. Hence it is not surprising for me that
task 3 is reading from a Kafka partition, although it does not seem to read
from the assigned Kafka partition (according to the {{Setting newly assigned}}
log line ).
*I also noticed that many tasks are emitting what appears to be the same
message?*
{noformat}
e.g.
2017-05-03 13:50:47.655 o.a.s.d.task Thread-8-kafkaspout-executor[11 18] [INFO]
Emitting: kafkaspout 18 default [8topic, 2, 0, null, 1]
2017-05-03 13:50:47.658 o.a.s.d.task Thread-8-kafkaspout-executor[11 18] [INFO]
Emitting: kafkaspout 11 default [8topic, 2, 0, null, 1]
2017-05-03 13:50:47.660 o.a.s.d.task Thread-12-kafkaspout-executor[3 10] [INFO]
Emitting: kafkaspout 7 default [8topic, 2, 0, null, 1]
2017-05-03 13:50:47.670 o.a.s.d.task Thread-12-kafkaspout-executor[3 10] [INFO]
Emitting: kafkaspout 8 default [8topic, 2, 0, null, 1]
{noformat}
Are you deriving this information from the fact that all of the above messages
have the same message ID (0) ?
*Is the start of the log the first occurrence of partition assignments in the
log?*
No, I have randomly taken a segment from the worker logs. I am attaching the
full worker.log file in this ticket.
*Are you using automatic or manual subscription to Kafka?*
I did not understand the question. I am running Kafka on my local machine and
using a KafkaSpout to read a topic from this local instance of Kafka.
> Incorrect logs for mapping between Kafka partitions and task IDs
> ----------------------------------------------------------------
>
> Key: STORM-2514
> URL: https://issues.apache.org/jira/browse/STORM-2514
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Reporter: Srishty Agrawal
>
> While working on
> [STORM-2506|https://issues.apache.org/jira/browse/STORM-2506], the worker
> logs were generated with debug mode on. The information printed about mapping
> between Task IDs and kafka partitions was contradictory to my assumptions. I
> ran a topology which used KafkaSpout from the storm-kafka-client module, it
> had a parallelism hint of 2 (number of executors) and a total of 16 tasks.
> The log lines mentioned below show assigned mapping between executors and
> kafka partitions:
> {noformat}
> o.a.k.c.c.i.ConsumerCoordinator Thread-12-kafkaspout-executor[3 10] [INFO]
> Setting newly assigned partitions [8topic-4, 8topic-6, 8topic-5, 8topic-7]
> for group kafkaSpoutTestGroup
> o.a.s.k.s.KafkaSpout Thread-12-kafkaspout-executor[3 10] [INFO] Partitions
> reassignment. [taskID=10, consumer-group=kafkaSpoutTestGroup,
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@108e79ce,
> topic-partitions=[8topic-4, 8topic-6, 8topic-5, 8topic-7]]
> o.a.k.c.c.i.ConsumerCoordinator Thread-8-kafkaspout-executor[11 18] [INFO]
> Setting newly assigned partitions [8topic-2, 8topic-1, 8topic-3, 8topic-0]
> for group kafkaSpoutTestGroup
> o.a.s.k.s.KafkaSpout Thread-8-kafkaspout-executor[11 18] [INFO] Partitions
> reassignment. [taskID=15, consumer-group=kafkaSpoutTestGroup,
> consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2dc37126,
> topic-partitions=[8topic-2, 8topic-1, 8topic-3, 8topic-0]]
> {noformat}
> It is evident that only tasks (with ID 3, 4, 5, 6, 7, 8, 9, 10) in Executor1
> (3 10) will be reading from kafka partitions 4, 5, 6 and 7. Similarly, tasks
> in Executor2 (11 18) will be reading from kafka partitions 0, 1, 2 and 3.
> These log lines are being printed by Tasks with IDs 10 and 15 in respective
> executors.
> Logs which emit individual messages do not abide by the above assumption. For
> example in the log mentioned below, Task ID 3 (added code, as a part of
> debugging STORM-2506, to print the Task ID right next to component ID) which
> runs on Executor1 reads from partition 2 (the second value inside the square
> brackets), instead of 4, 5, 6 or 7.
> {noformat}Thread-12-kafkaspout-executor[3 10] [INFO] Emitting: kafkaspout 3
> default [8topic, 2, 0, null, 1]{noformat}
> This behavior has been summarized in the table below :
> {noformat}
> Task IDs ------- 3, 4, 7, 8, 9, 11, 15, 18 ------------ Partitions 0, 1, 2, 3
> Task IDs ------- 5, 6, 10, 12, 13, 14, 16, 17 --------- Partition 4, 5, 6, 7
> {noformat}
> [You can find the relevant parts of log file
> here.|https://gist.github.com/srishtyagrawal/f7c53db6b8391e2c3bd522afc93b5351]
>
> Am I misunderstanding something here? Do tasks {{5, 6, 10, 12, 13, 14, 16,
> 17}} correspond to executor1 and {{3, 4, 7, 8, 9, 11, 15, 18}} correspond to
> executor2? Are (3 10) not the starting and ending task IDs in Executor1?
> Another interesting thing to note is that, Task IDs 10 and 15 are always
> reading from the partitions they claimed to be reading from (while setting
> partition assignments).
> If my assumptions are correct, there is a bug in the way the mapping
> information is being/passed to worker logs. If not, we need to make changes
> in our docs.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)