[ 
https://issues.apache.org/jira/browse/STORM-2077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15712731#comment-15712731
 ] 

P. Taylor Goetz commented on STORM-2077:
----------------------------------------

Quoting [~db3f] from STORM-2229 (provides additional detail on a proposed fix):

When the topology fails a tuple, it is never resent by the KafkaSpout. This can 
easily be shown by constructing a small topology failing every tuple.

Apparent reason:

{code}
public class KafkaSpout<K, V> extends BaseRichSpout {
//...
private void doSeekRetriableTopicPartitions() {
        final Set<TopicPartition> retriableTopicPartitions = 
retryService.retriableTopicPartitions();

        for (TopicPartition rtp : retriableTopicPartitions) {
            final OffsetAndMetadata offsetAndMeta = 
acked.get(rtp).findNextCommitOffset();
            if (offsetAndMeta != null) {
                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
            } else {
                kafkaConsumer.seekToEnd(toArrayList(rtp));    // Seek to last 
committed offset <== Does seek to end of partition
            }
        }
    }
{code}

The code seeks to the end of the partition instead of seeking to the first 
uncommited offset.

Preliminary fix (worked for me, but needs to be checked by an expert)

{code}
    private void doSeekRetriableTopicPartitions() {
        final Set<TopicPartition> retriableTopicPartitions = 
retryService.retriableTopicPartitions();

        for (TopicPartition rtp : retriableTopicPartitions) {
            final OffsetAndMetadata offsetAndMeta = 
acked.get(rtp).findNextCommitOffset();
            if (offsetAndMeta != null) {
                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek 
to the next offset that is ready to commit in next commit cycle
            } else {
                OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
                if(committed == null) {
                    // No offsets commited yet for this partition - start from 
beginning 
                    kafkaConsumer.seekToBeginning(toArrayList(rtp));
                } else {
                   // Seek to first uncommitted offset
                    kafkaConsumer.seek(rtp, committed.offset() + 1);
                }
            }
        }
    }
{code}


> KafkaSpout doesn't retry failed tuples
> --------------------------------------
>
>                 Key: STORM-2077
>                 URL: https://issues.apache.org/jira/browse/STORM-2077
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.0.2
>            Reporter: Tobias Maier
>
> KafkaSpout does not retry all failed tuples.
> We used following Configuration:
>         Map<String, Object> props = new HashMap<>();
>         props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "c1");
>         props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
> ByteArrayDeserializer.class.getName());
>         props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
> ByteArrayDeserializer.class.getName());
>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> broker.bootstrapServer());
>         KafkaSpoutStreams kafkaSpoutStreams = new 
> KafkaSpoutStreams.Builder(FIELDS_KAFKA_EVENT, new 
> String[]{"test-topic"}).build();
>         KafkaSpoutTuplesBuilder<byte[], byte[]> kafkaSpoutTuplesBuilder = new 
> KafkaSpoutTuplesBuilder.Builder<>(new 
> KeyValueKafkaSpoutTupleBuilder("test-topic")).build();
>         KafkaSpoutRetryService retryService = new 
> KafkaSpoutLoggedRetryExponentialBackoff(KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1),
>  KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), 3, 
> KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.seconds(1));
>         KafkaSpoutConfig<byte[], byte[]> config = new 
> KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, kafkaSpoutTuplesBuilder, 
> retryService)
>                 .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
>                 .setMaxUncommittedOffsets(30)
>                 .setOffsetCommitPeriodMs(10)
>                 .setMaxRetries(3)
>                 .build();
> kafkaSpout = new org.apache.storm.kafka.spout.KafkaSpout<>(config);
> The downstream bolt fails every tuple and we expect, that those tuple will 
> all be replayed. But that's not the case for every tuple.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to