[ 
https://issues.apache.org/jira/browse/STORM-3090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nikita Gorbachevski updated STORM-3090:
---------------------------------------
    Description: 
Our topology uses KafkaSpout to fetch messages from kafka topics. We have ~150 
topics with ~12 partitions, 8 storm executors and tasks on 2 storm nodes.
 Storm version 1.0.5, Kafka brokers version 10.0.2, Kafka clients version 
0.9.0.1. We do not delete Kafka topics.

At some moment in time i observed huge amount of repetitive WARN messages in 
worker.log

2018-05-29 14:36:57.928 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 
18] [WARN] Partition\{host1:9092, topic=topic_1, partition=10} Got fetch 
request with offset out of range: [9248]
 2018-05-29 14:36:57.929 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 
16] [WARN] Partition\{host=host2:9092, topic=topic_2, partition=0} Got fetch 
request with offset out of range: [22650006]
 2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 
18] [WARN] Partition\{host=host2:9092, topic=topic_1, partition=2} Got fetch 
request with offset out of range: [6780]
 2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 
16] [WARN] Partition\{host=host3:9092, topic=topic_3, partition=4} Got fetch 
request with offset out of range: [1011584]
 2018-05-29 14:36:57.932 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 
12] [WARN] Partition\{host1:9092, topic=topic4, partition=4} Got fetch request 
with offset out of range: [9266]
 2018-05-29 14:36:57.933 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 
12] [WARN] Partition\{host=host2:9092, topic=topic5, partition=4} Got fetch 
request with offset out of range: [9266]
 2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 
16] [WARN] Partition\{host1:9092, topic=topic6, partition=4} Got fetch request 
with offset out of range: [1011584]
 2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-29-kafka-spout-executor[14 
14] [WARN] Partition\{host1:9092, topic=topic7, partition=2} Got fetch request 
with offset out of range: [337]
 2018-05-29 14:36:57.936 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 
18] [WARN] Partition\{host=host2:9092, topic=topic6, partition=10} Got fetch 
request with offset out of range: [9248]

For some reason the same *constant* offset value is used for the same partition 
of different topics.

I enabled DEBUG mode and observed log files more precisely.
 2018-05-29 14:37:03.573 o.a.s.k.PartitionManager 
Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset 
(1572936) to ZK for Partition\{host=host3:9092, topic=topic1, partition=8} for 
topology: topology1
 2018-05-29 14:37:03.577 o.a.s.k.PartitionManager 
Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset 
(1572936) to ZK for Partition\{host=host1:9092, topic=topic2, partition=8} for 
topology: topology1
 2018-05-29 14:37:03.578 o.a.s.k.PartitionManager 
Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset 
(1572936) to ZK for Partition\{host=host2:9092, topic=topic3, partition=8} for 
topology: topology1
 ...
 2018-05-29 14:38:07.581 o.a.s.k.PartitionManager 
Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset 
(61292573) to ZK for Partition\{host=host1:9092, topic=topic4, partition=8} for 
topology: topology1
 2018-05-29 14:38:07.582 o.a.s.k.PartitionManager 
Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset 
(61292573) to ZK for Partition\{host=host2:9092, topic=topic5, partition=8} for 
topology: topology1
 2018-05-29 14:38:07.584 o.a.s.k.PartitionManager 
Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset 
(61292573) to ZK for Partition\{host=host3:9092, topic=topic6, partition=8} for 
topology: topology1

I noticed that some part of all the topics was split in two independent groups. 
Each group consisted of 31 topics.
 All the topics in each group were using the same offset value for each 
partition. However that value wasn't constant and vary between 8 different 
values.
 Each of these 8 values was correct for a particular topics from the group. 
Moreover each of these values was growing over the time and all the topics 
updated it synchronously.
 Most of the topics(55 from 62) from each group had a corresponding 'offset out 
or range' WARNING message, but with the *constant* value. Other 7 topics were 
continuing to work correctly without WARNING messages but their offset value 
was changing as well.

I went through the source code of storm-kafka and noticed that 
useStartOffsetTimeIfOffsetOutOfRange flag doesn't work in our case, because we 
don't have failed tuples and kafka offset is less than _emittedToOffset. So the 
same WARN message is logged again and again.
{code:java}
        } catch (TopicOffsetOutOfRangeException e) {
            offset = KafkaUtils.getOffset(_consumer, _partition.topic, 
_partition.partition, kafka.api.OffsetRequest.EarliestTime());
            // fetch failed, so don't update the fetch metrics

            //fix bug [STORM-643] : remove outdated failed offsets
            if (!processingNewTuples) {
                // For the case of EarliestTime it would be better to discard
                // all the failed offsets, that are earlier than actual 
EarliestTime
                // offset, since they are anyway not there.
                // These calls to broker API will be then saved.
                Set<Long> omitted = 
this._failedMsgRetryManager.clearOffsetsBefore(offset);

                // Omitted messages have not been acked and may be lost
                if (null != omitted) {
                    _lostMessageCount.incrBy(omitted.size());
                }

                _pending.headMap(offset).clear();

                LOG.warn("Removing the failed offsets for {} that are out of 
range: {}", _partition, omitted);
            }

            if (offset > _emittedToOffset) {
                _lostMessageCount.incrBy(offset - _emittedToOffset);
                _emittedToOffset = offset;
                LOG.warn("{} Using new offset: {}", _partition, 
_emittedToOffset);
            }

            return;
        }
{code}
However i don't understand to is it possible that _emittedToOffset got the same 
value 
 for different topics. Do you probably have any ideas why this could happen?

  was:
Our topology uses KafkaSpout to fetch messages from kafka topics. We have ~150 
topics with ~12 partitions, 8 storm executors and tasks on 2 storm nodes.
Storm version 1.0.5, Kafka brokers version 10.0.2, Kafka clients version 
0.9.0.1. We do not delete Kafka topics.

At some moment in time i observed huge amount of repetitive WARN messages in 
worker.log

2018-05-29 14:36:57.928 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 
18] [WARN] Partition\{host1:9092, topic=topic_1, partition=10} Got fetch 
request with offset out of range: [9248]
2018-05-29 14:36:57.929 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 
16] [WARN] Partition\{host=host2:9092, topic=topic_2, partition=0} Got fetch 
request with offset out of range: [22650006]
2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 
18] [WARN] Partition\{host=host2:9092, topic=topic_1, partition=2} Got fetch 
request with offset out of range: [6780]
2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 
16] [WARN] Partition\{host=host3:9092, topic=topic_3, partition=4} Got fetch 
request with offset out of range: [1011584]
2018-05-29 14:36:57.932 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 12] 
[WARN] Partition\{host1:9092, topic=topic4, partition=4} Got fetch request with 
offset out of range: [9266]
2018-05-29 14:36:57.933 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 12] 
[WARN] Partition\{host=host2:9092, topic=topic5, partition=4} Got fetch request 
with offset out of range: [9266]
2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 
16] [WARN] Partition\{host1:9092, topic=topic6, partition=4} Got fetch request 
with offset out of range: [1011584]
2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-29-kafka-spout-executor[14 
14] [WARN] Partition\{host1:9092, topic=topic7, partition=2} Got fetch request 
with offset out of range: [337]
2018-05-29 14:36:57.936 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 
18] [WARN] Partition\{host=host2:9092, topic=topic6, partition=10} Got fetch 
request with offset out of range: [9248]

For some reason the same constant offset value is used for the same partition 
of different topics.

I enabled DEBUG mode and observed log files more precisely.
2018-05-29 14:37:03.573 o.a.s.k.PartitionManager 
Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset 
(1572936) to ZK for Partition\{host=host3:9092, topic=topic1, partition=8} for 
topology: topology1
2018-05-29 14:37:03.577 o.a.s.k.PartitionManager 
Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset 
(1572936) to ZK for Partition\{host=host1:9092, topic=topic2, partition=8} for 
topology: topology1
2018-05-29 14:37:03.578 o.a.s.k.PartitionManager 
Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset 
(1572936) to ZK for Partition\{host=host2:9092, topic=topic3, partition=8} for 
topology: topology1
...
2018-05-29 14:38:07.581 o.a.s.k.PartitionManager 
Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset 
(61292573) to ZK for Partition\{host=host1:9092, topic=topic4, partition=8} for 
topology: topology1
2018-05-29 14:38:07.582 o.a.s.k.PartitionManager 
Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset 
(61292573) to ZK for Partition\{host=host2:9092, topic=topic5, partition=8} for 
topology: topology1
2018-05-29 14:38:07.584 o.a.s.k.PartitionManager 
Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset 
(61292573) to ZK for Partition\{host=host3:9092, topic=topic6, partition=8} for 
topology: topology1

I noticed that some part of all the topics was split in two independent groups. 
Each group consisted of 31 topics.
All the topics in each group were using the same offset value for each 
partition. However that value wasn't constant and vary between 8 different 
values.
Each of these 8 values was correct for a particular topics from the group. 
Moreover this value was growing over the time and all the topics updated it 
synchronously.
Most of the topics(55 from 62) from each group had a corresponding 'offset out 
or range' WARNING message,
but with the constant value. Other 7 topics were continuing to work correctly 
without WARNING messages but their offset value was changing as well.

I went through the source code of storm-kafka and noticed that 
useStartOffsetTimeIfOffsetOutOfRange flag doesn't work in our case, because we 
don't have
failed tuples and kafka offset is less than _emittedToOffset. However i don't 
understand to is it possible that _emittedToOffset got the same value 
for different topics. Do you probably have any ideas why this could happen?


> The same offset value is used by different topics.
> --------------------------------------------------
>
>                 Key: STORM-3090
>                 URL: https://issues.apache.org/jira/browse/STORM-3090
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka
>    Affects Versions: 1.0.5
>            Reporter: Nikita Gorbachevski
>            Priority: Major
>
> Our topology uses KafkaSpout to fetch messages from kafka topics. We have 
> ~150 topics with ~12 partitions, 8 storm executors and tasks on 2 storm nodes.
>  Storm version 1.0.5, Kafka brokers version 10.0.2, Kafka clients version 
> 0.9.0.1. We do not delete Kafka topics.
> At some moment in time i observed huge amount of repetitive WARN messages in 
> worker.log
> 2018-05-29 14:36:57.928 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 
> 18] [WARN] Partition\{host1:9092, topic=topic_1, partition=10} Got fetch 
> request with offset out of range: [9248]
>  2018-05-29 14:36:57.929 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 
> 16] [WARN] Partition\{host=host2:9092, topic=topic_2, partition=0} Got fetch 
> request with offset out of range: [22650006]
>  2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 
> 18] [WARN] Partition\{host=host2:9092, topic=topic_1, partition=2} Got fetch 
> request with offset out of range: [6780]
>  2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 
> 16] [WARN] Partition\{host=host3:9092, topic=topic_3, partition=4} Got fetch 
> request with offset out of range: [1011584]
>  2018-05-29 14:36:57.932 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 
> 12] [WARN] Partition\{host1:9092, topic=topic4, partition=4} Got fetch 
> request with offset out of range: [9266]
>  2018-05-29 14:36:57.933 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 
> 12] [WARN] Partition\{host=host2:9092, topic=topic5, partition=4} Got fetch 
> request with offset out of range: [9266]
>  2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 
> 16] [WARN] Partition\{host1:9092, topic=topic6, partition=4} Got fetch 
> request with offset out of range: [1011584]
>  2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-29-kafka-spout-executor[14 
> 14] [WARN] Partition\{host1:9092, topic=topic7, partition=2} Got fetch 
> request with offset out of range: [337]
>  2018-05-29 14:36:57.936 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 
> 18] [WARN] Partition\{host=host2:9092, topic=topic6, partition=10} Got fetch 
> request with offset out of range: [9248]
> For some reason the same *constant* offset value is used for the same 
> partition of different topics.
> I enabled DEBUG mode and observed log files more precisely.
>  2018-05-29 14:37:03.573 o.a.s.k.PartitionManager 
> Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset 
> (1572936) to ZK for Partition\{host=host3:9092, topic=topic1, partition=8} 
> for topology: topology1
>  2018-05-29 14:37:03.577 o.a.s.k.PartitionManager 
> Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset 
> (1572936) to ZK for Partition\{host=host1:9092, topic=topic2, partition=8} 
> for topology: topology1
>  2018-05-29 14:37:03.578 o.a.s.k.PartitionManager 
> Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset 
> (1572936) to ZK for Partition\{host=host2:9092, topic=topic3, partition=8} 
> for topology: topology1
>  ...
>  2018-05-29 14:38:07.581 o.a.s.k.PartitionManager 
> Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset 
> (61292573) to ZK for Partition\{host=host1:9092, topic=topic4, partition=8} 
> for topology: topology1
>  2018-05-29 14:38:07.582 o.a.s.k.PartitionManager 
> Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset 
> (61292573) to ZK for Partition\{host=host2:9092, topic=topic5, partition=8} 
> for topology: topology1
>  2018-05-29 14:38:07.584 o.a.s.k.PartitionManager 
> Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset 
> (61292573) to ZK for Partition\{host=host3:9092, topic=topic6, partition=8} 
> for topology: topology1
> I noticed that some part of all the topics was split in two independent 
> groups. Each group consisted of 31 topics.
>  All the topics in each group were using the same offset value for each 
> partition. However that value wasn't constant and vary between 8 different 
> values.
>  Each of these 8 values was correct for a particular topics from the group. 
> Moreover each of these values was growing over the time and all the topics 
> updated it synchronously.
>  Most of the topics(55 from 62) from each group had a corresponding 'offset 
> out or range' WARNING message, but with the *constant* value. Other 7 topics 
> were continuing to work correctly without WARNING messages but their offset 
> value was changing as well.
> I went through the source code of storm-kafka and noticed that 
> useStartOffsetTimeIfOffsetOutOfRange flag doesn't work in our case, because 
> we don't have failed tuples and kafka offset is less than _emittedToOffset. 
> So the same WARN message is logged again and again.
> {code:java}
>         } catch (TopicOffsetOutOfRangeException e) {
>             offset = KafkaUtils.getOffset(_consumer, _partition.topic, 
> _partition.partition, kafka.api.OffsetRequest.EarliestTime());
>             // fetch failed, so don't update the fetch metrics
>             //fix bug [STORM-643] : remove outdated failed offsets
>             if (!processingNewTuples) {
>                 // For the case of EarliestTime it would be better to discard
>                 // all the failed offsets, that are earlier than actual 
> EarliestTime
>                 // offset, since they are anyway not there.
>                 // These calls to broker API will be then saved.
>                 Set<Long> omitted = 
> this._failedMsgRetryManager.clearOffsetsBefore(offset);
>                 // Omitted messages have not been acked and may be lost
>                 if (null != omitted) {
>                     _lostMessageCount.incrBy(omitted.size());
>                 }
>                 _pending.headMap(offset).clear();
>                 LOG.warn("Removing the failed offsets for {} that are out of 
> range: {}", _partition, omitted);
>             }
>             if (offset > _emittedToOffset) {
>                 _lostMessageCount.incrBy(offset - _emittedToOffset);
>                 _emittedToOffset = offset;
>                 LOG.warn("{} Using new offset: {}", _partition, 
> _emittedToOffset);
>             }
>             return;
>         }
> {code}
> However i don't understand to is it possible that _emittedToOffset got the 
> same value 
>  for different topics. Do you probably have any ideas why this could happen?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to