[
https://issues.apache.org/jira/browse/STORM-2352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stig Rohde Døssing resolved STORM-2352.
---------------------------------------
Resolution: Fixed
2413 should resolve this
> New Kafka spout retries for ever even with retries of 5
> -------------------------------------------------------
>
> Key: STORM-2352
> URL: https://issues.apache.org/jira/browse/STORM-2352
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 1.0.0, 1.1.0
> Reporter: Kishore Senji
> Attachments: KafkaSpoutTest.java
>
>
> v1.0.0 and above
> KafkaSpout is created with a KafkaSpoutConfig having maxRetries of 5. Still
> the KafkaSpout retries the failed Tuple forever.
> Reason:
> The numFails are incremented in fail() method of KafkaSpout.
> {code}
> public void fail(Object messageId) {
> final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
> emitted.remove(msgId);
> if (msgId.numFails() < maxRetries) {
> msgId.incrementNumFails();
> retryService.schedule(msgId);
> } else { // limit to max number of retries
> LOG.debug("Reached maximum number of retries. Message [{}] being
> marked as acked.", msgId);
> ack(msgId);
> }
> }
> {code}
> However the emitTupleIfNotEmitted() creates a new KafkaSpoutMessageId and
> checks if the msgId is ready to be emitted (in the case of failure) and if so
> emits the new msgId instance (thus losing the numFails from the previous time)
> {code}
> private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
> final TopicPartition tp = new TopicPartition(record.topic(),
> record.partition());
> final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
> if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) { //
> has been acked
> LOG.trace("Tuple for record [{}] has already been acked.
> Skipping", record);
> } else if (emitted.contains(msgId)) { // has been emitted and it's
> pending ack or fail
> LOG.trace("Tuple for record [{}] has already been emitted.
> Skipping", record);
> } else if (!retryService.isScheduled(msgId) ||
> retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e.
> never emitted) or ready to be retried
> final List<Object> tuple = tuplesBuilder.buildTuple(record);
> kafkaSpoutStreams.emit(collector, tuple, msgId);
> emitted.add(msgId);
> numUncommittedOffsets++;
> if (retryService.isReady(msgId)) { // has failed. Is it ready for
> retry ?
> retryService.remove(msgId); // re-emitted hence remove from
> failed
> }
> LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
> }
> }
> {code}
> isReady() is not a side-effect. It just looks up and returns true. Fix is to
> either modify the RetryService interface to convey back the msgId in the
> RetryService or make the isReady() a side-effect to attach the numFails from
> the previous time OR to add 'failed' to KafkaSpout to keep track of failed
> msgs (similar to acked) and use the msgId from the failed to emit if
> isReady() is true
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)