[ https://issues.apache.org/jira/browse/STORM-3090?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nikita Gorbachevski updated STORM-3090: --------------------------------------- Priority: Critical (was: Major) > The same offset value is used by different topics. > -------------------------------------------------- > > Key: STORM-3090 > URL: https://issues.apache.org/jira/browse/STORM-3090 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka > Affects Versions: 1.0.5 > Reporter: Nikita Gorbachevski > Priority: Critical > > Our topology uses KafkaSpout to fetch messages from kafka topics. We have > ~150 topics with ~12 partitions, 8 storm executors and tasks on 2 storm nodes. > Storm version 1.0.5, Kafka brokers version 10.0.2, Kafka clients version > 0.9.0.1. We do not delete Kafka topics. > At some moment in time i observed huge amount of repetitive WARN messages in > worker.log > 2018-05-29 14:36:57.928 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 > 18] [WARN] Partition\{host1:9092, topic=topic_1, partition=10} Got fetch > request with offset out of range: [9248] > 2018-05-29 14:36:57.929 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 > 16] [WARN] Partition\{host=host2:9092, topic=topic_2, partition=0} Got fetch > request with offset out of range: [22650006] > 2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 > 18] [WARN] Partition\{host=host2:9092, topic=topic_1, partition=2} Got fetch > request with offset out of range: [6780] > 2018-05-29 14:36:57.930 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 > 16] [WARN] Partition\{host=host3:9092, topic=topic_3, partition=4} Got fetch > request with offset out of range: [1011584] > 2018-05-29 14:36:57.932 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 > 12] [WARN] Partition\{host1:9092, topic=topic4, partition=4} Got fetch > request with offset out of range: [9266] > 2018-05-29 14:36:57.933 o.a.s.k.KafkaUtils Thread-7-kafka-spout-executor[12 > 12] [WARN] Partition\{host=host2:9092, topic=topic5, partition=4} Got fetch > request with offset out of range: [9266] > 2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-23-kafka-spout-executor[16 > 16] [WARN] Partition\{host1:9092, topic=topic6, partition=4} Got fetch > request with offset out of range: [1011584] > 2018-05-29 14:36:57.935 o.a.s.k.KafkaUtils Thread-29-kafka-spout-executor[14 > 14] [WARN] Partition\{host1:9092, topic=topic7, partition=2} Got fetch > request with offset out of range: [337] > 2018-05-29 14:36:57.936 o.a.s.k.KafkaUtils Thread-15-kafka-spout-executor[18 > 18] [WARN] Partition\{host=host2:9092, topic=topic6, partition=10} Got fetch > request with offset out of range: [9248] > For some reason the same *constant* offset value is used for the same > partition of different topics. > I enabled DEBUG mode and observed log files more precisely. > 2018-05-29 14:37:03.573 o.a.s.k.PartitionManager > Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset > (1572936) to ZK for Partition\{host=host3:9092, topic=topic1, partition=8} > for topology: topology1 > 2018-05-29 14:37:03.577 o.a.s.k.PartitionManager > Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset > (1572936) to ZK for Partition\{host=host1:9092, topic=topic2, partition=8} > for topology: topology1 > 2018-05-29 14:37:03.578 o.a.s.k.PartitionManager > Thread-7-kafka-spout-executor[12 12] [DEBUG] Wrote last completed offset > (1572936) to ZK for Partition\{host=host2:9092, topic=topic3, partition=8} > for topology: topology1 > ... > 2018-05-29 14:38:07.581 o.a.s.k.PartitionManager > Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset > (61292573) to ZK for Partition\{host=host1:9092, topic=topic4, partition=8} > for topology: topology1 > 2018-05-29 14:38:07.582 o.a.s.k.PartitionManager > Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset > (61292573) to ZK for Partition\{host=host2:9092, topic=topic5, partition=8} > for topology: topology1 > 2018-05-29 14:38:07.584 o.a.s.k.PartitionManager > Thread-23-kafka-spout-executor[16 16] [DEBUG] Wrote last completed offset > (61292573) to ZK for Partition\{host=host3:9092, topic=topic6, partition=8} > for topology: topology1 > I noticed that some part of all the topics was split in two independent > groups. Each group consisted of 31 topics. All the topics in each group were > using the same offset value for each partition. However that value wasn't > constant and vary between 8 different values. Each of these 8 values was > correct for a particular topics from the group. Moreover each of these values > was growing over the time and all the topics updated it synchronously. > Most of the topics(55 from 62) from each group had a corresponding 'offset > out or range' WARNING message, but with the *constant* value. Other 7 topics > were continuing to work correctly without WARNING messages but their offset > value was changing as well. > I went through the source code of storm-kafka and noticed that > useStartOffsetTimeIfOffsetOutOfRange flag doesn't work in our case, because > we don't have failed tuples and kafka offset is less than _emittedToOffset. > So the same WARN message is logged again and again. > {code:java} > } catch (TopicOffsetOutOfRangeException e) { > offset = KafkaUtils.getOffset(_consumer, _partition.topic, > _partition.partition, kafka.api.OffsetRequest.EarliestTime()); > // fetch failed, so don't update the fetch metrics > //fix bug [STORM-643] : remove outdated failed offsets > if (!processingNewTuples) { > // For the case of EarliestTime it would be better to discard > // all the failed offsets, that are earlier than actual > EarliestTime > // offset, since they are anyway not there. > // These calls to broker API will be then saved. > Set<Long> omitted = > this._failedMsgRetryManager.clearOffsetsBefore(offset); > // Omitted messages have not been acked and may be lost > if (null != omitted) { > _lostMessageCount.incrBy(omitted.size()); > } > _pending.headMap(offset).clear(); > LOG.warn("Removing the failed offsets for {} that are out of > range: {}", _partition, omitted); > } > if (offset > _emittedToOffset) { > _lostMessageCount.incrBy(offset - _emittedToOffset); > _emittedToOffset = offset; > LOG.warn("{} Using new offset: {}", _partition, > _emittedToOffset); > } > return; > } > {code} > However i don't understand how is it possible that _emittedToOffset got the > same value > for different topics. Do you probably have any ideas why this could happen? -- This message was sent by Atlassian JIRA (v7.6.3#76005)