[ 
https://issues.apache.org/jira/browse/STORM-2077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15725148#comment-15725148
 ] 

Renjie Liu commented on STORM-2077:
-----------------------------------

I think the patch is incorrect since the consumer has already seeked to the 
correct position in the ConsumerRebalanceListener. If the committed is not 
found, we should just ignore the this partition.

> KafkaSpout doesn't retry failed tuples
> --------------------------------------
>
>                 Key: STORM-2077
>                 URL: https://issues.apache.org/jira/browse/STORM-2077
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.0.2
>            Reporter: Tobias Maier
>
> KafkaSpout does not retry all failed tuples.
> We used following Configuration:
>         Map<String, Object> props = new HashMap<>();
>         props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "c1");
>         props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
> ByteArrayDeserializer.class.getName());
>         props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
> ByteArrayDeserializer.class.getName());
>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> broker.bootstrapServer());
>         KafkaSpoutStreams kafkaSpoutStreams = new 
> KafkaSpoutStreams.Builder(FIELDS_KAFKA_EVENT, new 
> String[]{"test-topic"}).build();
>         KafkaSpoutTuplesBuilder<byte[], byte[]> kafkaSpoutTuplesBuilder = new 
> KafkaSpoutTuplesBuilder.Builder<>(new 
> KeyValueKafkaSpoutTupleBuilder("test-topic")).build();
>         KafkaSpoutRetryService retryService = new 
> KafkaSpoutLoggedRetryExponentialBackoff(KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1),
>  KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), 3, 
> KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.seconds(1));
>         KafkaSpoutConfig<byte[], byte[]> config = new 
> KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, kafkaSpoutTuplesBuilder, 
> retryService)
>                 .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
>                 .setMaxUncommittedOffsets(30)
>                 .setOffsetCommitPeriodMs(10)
>                 .setMaxRetries(3)
>                 .build();
> kafkaSpout = new org.apache.storm.kafka.spout.KafkaSpout<>(config);
> The downstream bolt fails every tuple and we expect, that those tuple will 
> all be replayed. But that's not the case for every tuple.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to