[ 
https://issues.apache.org/jira/browse/STORM-2352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stig Rohde Døssing resolved STORM-2352.
---------------------------------------
    Resolution: Fixed

2413 should resolve this

> New Kafka spout retries for ever even with retries of 5
> -------------------------------------------------------
>
>                 Key: STORM-2352
>                 URL: https://issues.apache.org/jira/browse/STORM-2352
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.0.0, 1.1.0
>            Reporter: Kishore Senji
>         Attachments: KafkaSpoutTest.java
>
>
> v1.0.0 and above
> KafkaSpout is created with a KafkaSpoutConfig having maxRetries of 5. Still 
> the KafkaSpout retries the failed Tuple forever. 
> Reason:
> The numFails are incremented in fail() method of KafkaSpout.
> {code}
> public void fail(Object messageId) {
>         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
>         emitted.remove(msgId);
>         if (msgId.numFails() < maxRetries) {
>             msgId.incrementNumFails();
>             retryService.schedule(msgId);
>         } else { // limit to max number of retries
>             LOG.debug("Reached maximum number of retries. Message [{}] being 
> marked as acked.", msgId);
>             ack(msgId);
>         }
>     }
> {code}
> However the emitTupleIfNotEmitted() creates a new KafkaSpoutMessageId  and 
> checks if the msgId is ready to be emitted (in the case of failure) and if so 
> emits the new msgId instance (thus losing the numFails from the previous time)
> {code}
>     private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
>         final TopicPartition tp = new TopicPartition(record.topic(), 
> record.partition());
>         final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
>         if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) {   // 
> has been acked
>             LOG.trace("Tuple for record [{}] has already been acked. 
> Skipping", record);
>         } else if (emitted.contains(msgId)) {   // has been emitted and it's 
> pending ack or fail
>             LOG.trace("Tuple for record [{}] has already been emitted. 
> Skipping", record);
>         } else if (!retryService.isScheduled(msgId) || 
> retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. 
> never emitted) or ready to be retried
>             final List<Object> tuple = tuplesBuilder.buildTuple(record);
>             kafkaSpoutStreams.emit(collector, tuple, msgId);
>             emitted.add(msgId);
>             numUncommittedOffsets++;
>             if (retryService.isReady(msgId)) { // has failed. Is it ready for 
> retry ?
>                 retryService.remove(msgId);  // re-emitted hence remove from 
> failed
>             }
>             LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
>         }
>     }
> {code}
> isReady() is not a side-effect. It just looks up and returns true. Fix is to 
> either modify the RetryService interface to convey back the msgId in the 
> RetryService or make the isReady() a side-effect to attach the numFails from 
> the previous time OR to add 'failed' to KafkaSpout to keep track of failed 
> msgs (similar to acked) and use the msgId from the failed to emit if 
> isReady() is true



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to