[ https://issues.apache.org/jira/browse/STORM-3090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nikita Gorbachevski updated STORM-3090: --------------------------------------- Description: Our topology uses KafkaSpout to fetch messages from kafka topics. We have ~150 topics with ~12 partitions, 8 storm executors and tasks on 2 storm nodes. Storm version 1.0.5, Kafka brokers version 10.0.2, Kafka clients version 0.9.0.1. We do not delete Kafka topics. At some moment in time i observed huge amount of repetitive WARN messages in worker.log 2018-05-29 14:36:57.928 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18] [WARN] Partition\{host1:9092, topic=topic_1, partition=10} Got fetch request with offset out of range: [9248] 2018-05-29 14:36:57.929 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition\{host=host2:9092, topic=topic_2, partition=0} Got fetch request with offset out of range: [22650006] 2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18] [WARN] Partition\{host=host2:9092, topic=topic_1, partition=2} Got fetch request with offset out of range: [6780] 2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition\{host=host3:9092, topic=topic_3, partition=4} Got fetch request with offset out of range: [1011584] 2018-05-29 14:36:57.932 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 12] [WARN] Partition\{host1:9092, topic=topic4, partition=4} Got fetch request with offset out of range: [9266] 2018-05-29 14:36:57.933 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 12] [WARN] Partition\{host=host2:9092, topic=topic5, partition=4} Got fetch request with offset out of range: [9266] 2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition\{host1:9092, topic=topic6, partition=4} Got fetch request with offset out of range: [1011584] 2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-29-kafka-spout-executor[14 14] [WARN] Partition\{host1:9092, topic=topic7, partition=2} Got fetch request with offset out of range: [337] 2018-05-29 14:36:57.936 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18] [WARN] Partition\{host=host2:9092, topic=topic6, partition=10} Got fetch request with offset out of range: [9248] For some reason the same *constant* offset value is used for the same partition of different topics. I enabled DEBUG mode and observed log files more precisely. 2018-05-29 14:37:03.573 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition\{host=host3:9092, topic=topic1, partition=8} for topology: topology1 2018-05-29 14:37:03.577 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition\{host=host1:9092, topic=topic2, partition=8} for topology: topology1 2018-05-29 14:37:03.578 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition\{host=host2:9092, topic=topic3, partition=8} for topology: topology1 ... 2018-05-29 14:38:07.581 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition\{host=host1:9092, topic=topic4, partition=8} for topology: topology1 2018-05-29 14:38:07.582 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition\{host=host2:9092, topic=topic5, partition=8} for topology: topology1 2018-05-29 14:38:07.584 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition\{host=host3:9092, topic=topic6, partition=8} for topology: topology1 I noticed that some part of all the topics was split in two independent groups. Each group consisted of 31 topics. All the topics in each group were using the same offset value for each partition. However that value wasn't constant and vary between 8 different values. Each of these 8 values was correct for a particular topics from the group. Moreover each of these values was growing over the time and all the topics updated it synchronously. Most of the topics(55 from 62) from each group had a corresponding 'offset out or range' WARNING message, but with the *constant* value. Other 7 topics were continuing to work correctly without WARNING messages but their offset value was changing as well. I went through the source code of storm-kafka and noticed that useStartOffsetTimeIfOffsetOutOfRange flag doesn't work in our case, because we don't have failed tuples and kafka offset is less than _emittedToOffset. So the same WARN message is logged again and again. {code:java} } catch (TopicOffsetOutOfRangeException e) { offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime()); // fetch failed, so don't update the fetch metrics //fix bug [STORM-643] : remove outdated failed offsets if (!processingNewTuples) { // For the case of EarliestTime it would be better to discard // all the failed offsets, that are earlier than actual EarliestTime // offset, since they are anyway not there. // These calls to broker API will be then saved. Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset); // Omitted messages have not been acked and may be lost if (null != omitted) { _lostMessageCount.incrBy(omitted.size()); } _pending.headMap(offset).clear(); LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted); } if (offset > _emittedToOffset) { _lostMessageCount.incrBy(offset - _emittedToOffset); _emittedToOffset = offset; LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset); } return; } {code} However i don't understand to is it possible that _emittedToOffset got the same value for different topics. Do you probably have any ideas why this could happen? was: Our topology uses KafkaSpout to fetch messages from kafka topics. We have ~150 topics with ~12 partitions, 8 storm executors and tasks on 2 storm nodes. Storm version 1.0.5, Kafka brokers version 10.0.2, Kafka clients version 0.9.0.1. We do not delete Kafka topics. At some moment in time i observed huge amount of repetitive WARN messages in worker.log 2018-05-29 14:36:57.928 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18] [WARN] Partition\{host1:9092, topic=topic_1, partition=10} Got fetch request with offset out of range: [9248] 2018-05-29 14:36:57.929 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition\{host=host2:9092, topic=topic_2, partition=0} Got fetch request with offset out of range: [22650006] 2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18] [WARN] Partition\{host=host2:9092, topic=topic_1, partition=2} Got fetch request with offset out of range: [6780] 2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition\{host=host3:9092, topic=topic_3, partition=4} Got fetch request with offset out of range: [1011584] 2018-05-29 14:36:57.932 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 12] [WARN] Partition\{host1:9092, topic=topic4, partition=4} Got fetch request with offset out of range: [9266] 2018-05-29 14:36:57.933 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 12] [WARN] Partition\{host=host2:9092, topic=topic5, partition=4} Got fetch request with offset out of range: [9266] 2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 16] [WARN] Partition\{host1:9092, topic=topic6, partition=4} Got fetch request with offset out of range: [1011584] 2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-29-kafka-spout-executor[14 14] [WARN] Partition\{host1:9092, topic=topic7, partition=2} Got fetch request with offset out of range: [337] 2018-05-29 14:36:57.936 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 18] [WARN] Partition\{host=host2:9092, topic=topic6, partition=10} Got fetch request with offset out of range: [9248] For some reason the same constant offset value is used for the same partition of different topics. I enabled DEBUG mode and observed log files more precisely. 2018-05-29 14:37:03.573 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition\{host=host3:9092, topic=topic1, partition=8} for topology: topology1 2018-05-29 14:37:03.577 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition\{host=host1:9092, topic=topic2, partition=8} for topology: topology1 2018-05-29 14:37:03.578 o.a.s.k.PartitionManager Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset (1572936) to ZK for Partition\{host=host2:9092, topic=topic3, partition=8} for topology: topology1 ... 2018-05-29 14:38:07.581 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition\{host=host1:9092, topic=topic4, partition=8} for topology: topology1 2018-05-29 14:38:07.582 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition\{host=host2:9092, topic=topic5, partition=8} for topology: topology1 2018-05-29 14:38:07.584 o.a.s.k.PartitionManager Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset (61292573) to ZK for Partition\{host=host3:9092, topic=topic6, partition=8} for topology: topology1 I noticed that some part of all the topics was split in two independent groups. Each group consisted of 31 topics. All the topics in each group were using the same offset value for each partition. However that value wasn't constant and vary between 8 different values. Each of these 8 values was correct for a particular topics from the group. Moreover this value was growing over the time and all the topics updated it synchronously. Most of the topics(55 from 62) from each group had a corresponding 'offset out or range' WARNING message, but with the constant value. Other 7 topics were continuing to work correctly without WARNING messages but their offset value was changing as well. I went through the source code of storm-kafka and noticed that useStartOffsetTimeIfOffsetOutOfRange flag doesn't work in our case, because we don't have failed tuples and kafka offset is less than _emittedToOffset. However i don't understand to is it possible that _emittedToOffset got the same value for different topics. Do you probably have any ideas why this could happen? > The same offset value is used by different topics. > -------------------------------------------------- > > Key: STORM-3090 > URL: https://issues.apache.org/jira/browse/STORM-3090 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka > Affects Versions: 1.0.5 > Reporter: Nikita Gorbachevski > Priority: Major > > Our topology uses KafkaSpout to fetch messages from kafka topics. We have > ~150 topics with ~12 partitions, 8 storm executors and tasks on 2 storm nodes. > Storm version 1.0.5, Kafka brokers version 10.0.2, Kafka clients version > 0.9.0.1. We do not delete Kafka topics. > At some moment in time i observed huge amount of repetitive WARN messages in > worker.log > 2018-05-29 14:36:57.928 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 > 18] [WARN] Partition\{host1:9092, topic=topic_1, partition=10} Got fetch > request with offset out of range: [9248] > 2018-05-29 14:36:57.929 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 > 16] [WARN] Partition\{host=host2:9092, topic=topic_2, partition=0} Got fetch > request with offset out of range: [22650006] > 2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 > 18] [WARN] Partition\{host=host2:9092, topic=topic_1, partition=2} Got fetch > request with offset out of range: [6780] > 2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 > 16] [WARN] Partition\{host=host3:9092, topic=topic_3, partition=4} Got fetch > request with offset out of range: [1011584] > 2018-05-29 14:36:57.932 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 > 12] [WARN] Partition\{host1:9092, topic=topic4, partition=4} Got fetch > request with offset out of range: [9266] > 2018-05-29 14:36:57.933 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 > 12] [WARN] Partition\{host=host2:9092, topic=topic5, partition=4} Got fetch > request with offset out of range: [9266] > 2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 > 16] [WARN] Partition\{host1:9092, topic=topic6, partition=4} Got fetch > request with offset out of range: [1011584] > 2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-29-kafka-spout-executor[14 > 14] [WARN] Partition\{host1:9092, topic=topic7, partition=2} Got fetch > request with offset out of range: [337] > 2018-05-29 14:36:57.936 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 > 18] [WARN] Partition\{host=host2:9092, topic=topic6, partition=10} Got fetch > request with offset out of range: [9248] > For some reason the same *constant* offset value is used for the same > partition of different topics. > I enabled DEBUG mode and observed log files more precisely. > 2018-05-29 14:37:03.573 o.a.s.k.PartitionManager > Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset > (1572936) to ZK for Partition\{host=host3:9092, topic=topic1, partition=8} > for topology: topology1 > 2018-05-29 14:37:03.577 o.a.s.k.PartitionManager > Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset > (1572936) to ZK for Partition\{host=host1:9092, topic=topic2, partition=8} > for topology: topology1 > 2018-05-29 14:37:03.578 o.a.s.k.PartitionManager > Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset > (1572936) to ZK for Partition\{host=host2:9092, topic=topic3, partition=8} > for topology: topology1 > ... > 2018-05-29 14:38:07.581 o.a.s.k.PartitionManager > Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset > (61292573) to ZK for Partition\{host=host1:9092, topic=topic4, partition=8} > for topology: topology1 > 2018-05-29 14:38:07.582 o.a.s.k.PartitionManager > Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset > (61292573) to ZK for Partition\{host=host2:9092, topic=topic5, partition=8} > for topology: topology1 > 2018-05-29 14:38:07.584 o.a.s.k.PartitionManager > Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset > (61292573) to ZK for Partition\{host=host3:9092, topic=topic6, partition=8} > for topology: topology1 > I noticed that some part of all the topics was split in two independent > groups. Each group consisted of 31 topics. > All the topics in each group were using the same offset value for each > partition. However that value wasn't constant and vary between 8 different > values. > Each of these 8 values was correct for a particular topics from the group. > Moreover each of these values was growing over the time and all the topics > updated it synchronously. > Most of the topics(55 from 62) from each group had a corresponding 'offset > out or range' WARNING message, but with the *constant* value. Other 7 topics > were continuing to work correctly without WARNING messages but their offset > value was changing as well. > I went through the source code of storm-kafka and noticed that > useStartOffsetTimeIfOffsetOutOfRange flag doesn't work in our case, because > we don't have failed tuples and kafka offset is less than _emittedToOffset. > So the same WARN message is logged again and again. > {code:java} > } catch (TopicOffsetOutOfRangeException e) { > offset = KafkaUtils.getOffset(_consumer, _partition.topic, > _partition.partition, kafka.api.OffsetRequest.EarliestTime()); > // fetch failed, so don't update the fetch metrics > //fix bug [STORM-643] : remove outdated failed offsets > if (!processingNewTuples) { > // For the case of EarliestTime it would be better to discard > // all the failed offsets, that are earlier than actual > EarliestTime > // offset, since they are anyway not there. > // These calls to broker API will be then saved. > Set<Long> omitted = > this._failedMsgRetryManager.clearOffsetsBefore(offset); > // Omitted messages have not been acked and may be lost > if (null != omitted) { > _lostMessageCount.incrBy(omitted.size()); > } > _pending.headMap(offset).clear(); > LOG.warn("Removing the failed offsets for {} that are out of > range: {}", _partition, omitted); > } > if (offset > _emittedToOffset) { > _lostMessageCount.incrBy(offset - _emittedToOffset); > _emittedToOffset = offset; > LOG.warn("{} Using new offset: {}", _partition, > _emittedToOffset); > } > return; > } > {code} > However i don't understand to is it possible that _emittedToOffset got the > same value > for different topics. Do you probably have any ideas why this could happen? -- This message was sent by Atlassian JIRA (v7.6.3#76005)