[ 
https://issues.apache.org/jira/browse/STORM-2229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Klein updated STORM-2229:
----------------------------------
    Description: 
When the topology fails a tuple, it is never resent by the KafkaSpout. This can 
easily be shown by constructing a small topology failing every tuple.

Apparent reason:

{code}
public class KafkaSpout<K, V> extends BaseRichSpout {
//...
private void doSeekRetriableTopicPartitions() {
        final Set<TopicPartition> retriableTopicPartitions = 
retryService.retriableTopicPartitions();

        for (TopicPartition rtp : retriableTopicPartitions) {
            final OffsetAndMetadata offsetAndMeta = 
acked.get(rtp).findNextCommitOffset();
            if (offsetAndMeta != null) {
                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
            } else {
                kafkaConsumer.seekToEnd(toArrayList(rtp));    // Seek to last 
committed offset <== Does seek to end of partition
            }
        }
    }
{code}

The code seeks to the end of the partition instead of seeking to the first 
uncommited offset.

Preliminary fix (worked for me, but needs to be checked by an expert)

{code}
    private void doSeekRetriableTopicPartitions() {
        final Set<TopicPartition> retriableTopicPartitions = 
retryService.retriableTopicPartitions();

        for (TopicPartition rtp : retriableTopicPartitions) {
            final OffsetAndMetadata offsetAndMeta = 
acked.get(rtp).findNextCommitOffset();
            if (offsetAndMeta != null) {
                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
            } else {
                OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
                if(committed == null) {
                    // No offsets commited yet for this partition - start from 
beginning 
                    kafkaConsumer.seekToBeginning(toArrayList(rtp));
                } else {
                   // Seek to first uncommitted offset
                    kafkaConsumer.seek(rtp, committed.offset() + 1);
                }
            }
        }
    }
{code}


  was:
When the topology fails a tuple, it is never resent by the KafkaSpout. This can 
easily be shown by constructing a small topology failing every tuple.

Apparent reason:

{code}
public class KafkaSpout<K, V> extends BaseRichSpout {
//...
private void doSeekRetriableTopicPartitions() {
        final Set<TopicPartition> retriableTopicPartitions = 
retryService.retriableTopicPartitions();

        for (TopicPartition rtp : retriableTopicPartitions) {
            final OffsetAndMetadata offsetAndMeta = 
acked.get(rtp).findNextCommitOffset();
            if (offsetAndMeta != null) {
                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
            } else {
                kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1);    
// Seek to last committed offset <== Does seek to end of partition
            }
        }
    }
{code}

The code seeks to the end of the partition instead of seeking to the first 
uncommited offset.

Preliminary fix (worked for me, but needs to be checked by an expert)

{code}
    private void doSeekRetriableTopicPartitions() {
        final Set<TopicPartition> retriableTopicPartitions = 
retryService.retriableTopicPartitions();

        for (TopicPartition rtp : retriableTopicPartitions) {
            final OffsetAndMetadata offsetAndMeta = 
acked.get(rtp).findNextCommitOffset();
            if (offsetAndMeta != null) {
                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
            } else {
                OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
                if(committed == null) {
                    // No offsets commited yet for this partition - start from 
beginning 
                    kafkaConsumer.seekToBeginning(toArrayList(rtp));
                } else {
                   // Seek to first uncommitted offset
                    kafkaConsumer.seek(rtp, committed.offset() + 1);
                }
            }
        }
    }
{code}



> KafkaSpout does not resend failed tuples
> ----------------------------------------
>
>                 Key: STORM-2229
>                 URL: https://issues.apache.org/jira/browse/STORM-2229
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.0.0, 1.0.1, 1.0.2
>            Reporter: Matthias Klein
>
> When the topology fails a tuple, it is never resent by the KafkaSpout. This 
> can easily be shown by constructing a small topology failing every tuple.
> Apparent reason:
> {code}
> public class KafkaSpout<K, V> extends BaseRichSpout {
> //...
> private void doSeekRetriableTopicPartitions() {
>         final Set<TopicPartition> retriableTopicPartitions = 
> retryService.retriableTopicPartitions();
>         for (TopicPartition rtp : retriableTopicPartitions) {
>             final OffsetAndMetadata offsetAndMeta = 
> acked.get(rtp).findNextCommitOffset();
>             if (offsetAndMeta != null) {
>                 kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
> to the next offset that is ready to commit in next commit cycle
>             } else {
>                 kafkaConsumer.seekToEnd(toArrayList(rtp));    // Seek to last 
> committed offset <== Does seek to end of partition
>             }
>         }
>     }
> {code}
> The code seeks to the end of the partition instead of seeking to the first 
> uncommited offset.
> Preliminary fix (worked for me, but needs to be checked by an expert)
> {code}
>     private void doSeekRetriableTopicPartitions() {
>         final Set<TopicPartition> retriableTopicPartitions = 
> retryService.retriableTopicPartitions();
>         for (TopicPartition rtp : retriableTopicPartitions) {
>             final OffsetAndMetadata offsetAndMeta = 
> acked.get(rtp).findNextCommitOffset();
>             if (offsetAndMeta != null) {
>                 kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
> to the next offset that is ready to commit in next commit cycle
>             } else {
>                 OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
>                 if(committed == null) {
>                     // No offsets commited yet for this partition - start 
> from beginning 
>                     kafkaConsumer.seekToBeginning(toArrayList(rtp));
>                 } else {
>                    // Seek to first uncommitted offset
>                     kafkaConsumer.seek(rtp, committed.offset() + 1);
>                 }
>             }
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to