[ 
https://issues.apache.org/jira/browse/STORM-2352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kishore Senji updated STORM-2352:
---------------------------------
    Description: 
v1.0.0 and above

KafkaSpout is created with a KafkaSpoutConfig having maxRetries of 5. Still the 
KafkaSpout retries the failed Tuple forever. 

Reason:
The numFails are incremented in fail() method of KafkaSpout.
{code}
public void fail(Object messageId) {
        final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
        emitted.remove(msgId);
        if (msgId.numFails() < maxRetries) {
            msgId.incrementNumFails();
            retryService.schedule(msgId);
        } else { // limit to max number of retries
            LOG.debug("Reached maximum number of retries. Message [{}] being 
marked as acked.", msgId);
            ack(msgId);
        }
    }
{code}

However the emitTupleIfNotEmitted() creates a new KafkaSpoutMessageId  and 
checks if the msgId is ready to be emitted (in the case of failure) and if so 
emits the new msgId instance (thus losing the numFails from the previous time)

{code}
    private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
        final TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
        final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);

        if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) {   // has 
been acked
            LOG.trace("Tuple for record [{}] has already been acked. Skipping", 
record);
        } else if (emitted.contains(msgId)) {   // has been emitted and it's 
pending ack or fail
            LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
        } else if (!retryService.isScheduled(msgId) || 
retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never 
emitted) or ready to be retried
            final List<Object> tuple = tuplesBuilder.buildTuple(record);
            kafkaSpoutStreams.emit(collector, tuple, msgId);
            emitted.add(msgId);
            numUncommittedOffsets++;
            if (retryService.isReady(msgId)) { // has failed. Is it ready for 
retry ?
                retryService.remove(msgId);  // re-emitted hence remove from 
failed
            }
            LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
        }
    }
{code}

isReady() is not a side-effect. It just looks up and returns true. Fix is to 
either modify the RetryService interface to convey back the msgId in the 
RetryService or make the isReady() a side-effect to attach the numFails from 
the previous time OR to add 'failed' to KafkaSpout to keep track of failed msgs 
(similar to acked) and use the msgId from the failed to emit if isReady() is 
true

  was:
v1.0.0 and above

KafkaSpout is created with a KafkaSpoutConfig having maxRetries of 5. Still the 
KafkaSpout retries the failed Tuple forever. 

Reason:
The numFails are incremented in fail() method of KafkaSpout.
{code}
public void fail(Object messageId) {
        final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
        emitted.remove(msgId);
        if (msgId.numFails() < maxRetries) {
            msgId.incrementNumFails();
            retryService.schedule(msgId);
        } else { // limit to max number of retries
            LOG.debug("Reached maximum number of retries. Message [{}] being 
marked as acked.", msgId);
            ack(msgId);
        }
    }
{code}

However the emitTupleIfNotEmitted() creates a new KafkaSpoutMessageId  and 
checks if the msgId is ready to be emitted (in the case of failure) and if so 
emits the new msgId instance (thus losing the numFails from the previous time)

{code}
    private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
        final TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
        final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);

        if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) {   // has 
been acked
            LOG.trace("Tuple for record [{}] has already been acked. Skipping", 
record);
        } else if (emitted.contains(msgId)) {   // has been emitted and it's 
pending ack or fail
            LOG.trace("Tuple for record [{}] has already been emitted. 
Skipping", record);
        } else if (!retryService.isScheduled(msgId) || 
retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never 
emitted) or ready to be retried
            final List<Object> tuple = tuplesBuilder.buildTuple(record);
            kafkaSpoutStreams.emit(collector, tuple, msgId);
            emitted.add(msgId);
            numUncommittedOffsets++;
            if (retryService.isReady(msgId)) { // has failed. Is it ready for 
retry ?
                retryService.remove(msgId);  // re-emitted hence remove from 
failed
            }
            LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
        }
    }
{code}

isReady() is not a side-effect. It just looks up and returns true. Fix is to 
either modify the RetryService interface to convey back the msgId in the 
RetryService or make the isReady() a side-effect to attach the numFails from 
the previous time


> New Kafka spout retries for ever even with retries of 5
> -------------------------------------------------------
>
>                 Key: STORM-2352
>                 URL: https://issues.apache.org/jira/browse/STORM-2352
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.0.0, 1.1.0
>            Reporter: Kishore Senji
>
> v1.0.0 and above
> KafkaSpout is created with a KafkaSpoutConfig having maxRetries of 5. Still 
> the KafkaSpout retries the failed Tuple forever. 
> Reason:
> The numFails are incremented in fail() method of KafkaSpout.
> {code}
> public void fail(Object messageId) {
>         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
>         emitted.remove(msgId);
>         if (msgId.numFails() < maxRetries) {
>             msgId.incrementNumFails();
>             retryService.schedule(msgId);
>         } else { // limit to max number of retries
>             LOG.debug("Reached maximum number of retries. Message [{}] being 
> marked as acked.", msgId);
>             ack(msgId);
>         }
>     }
> {code}
> However the emitTupleIfNotEmitted() creates a new KafkaSpoutMessageId  and 
> checks if the msgId is ready to be emitted (in the case of failure) and if so 
> emits the new msgId instance (thus losing the numFails from the previous time)
> {code}
>     private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
>         final TopicPartition tp = new TopicPartition(record.topic(), 
> record.partition());
>         final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
>         if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) {   // 
> has been acked
>             LOG.trace("Tuple for record [{}] has already been acked. 
> Skipping", record);
>         } else if (emitted.contains(msgId)) {   // has been emitted and it's 
> pending ack or fail
>             LOG.trace("Tuple for record [{}] has already been emitted. 
> Skipping", record);
>         } else if (!retryService.isScheduled(msgId) || 
> retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. 
> never emitted) or ready to be retried
>             final List<Object> tuple = tuplesBuilder.buildTuple(record);
>             kafkaSpoutStreams.emit(collector, tuple, msgId);
>             emitted.add(msgId);
>             numUncommittedOffsets++;
>             if (retryService.isReady(msgId)) { // has failed. Is it ready for 
> retry ?
>                 retryService.remove(msgId);  // re-emitted hence remove from 
> failed
>             }
>             LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
>         }
>     }
> {code}
> isReady() is not a side-effect. It just looks up and returns true. Fix is to 
> either modify the RetryService interface to convey back the msgId in the 
> RetryService or make the isReady() a side-effect to attach the numFails from 
> the previous time OR to add 'failed' to KafkaSpout to keep track of failed 
> msgs (similar to acked) and use the msgId from the failed to emit if 
> isReady() is true



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to