[ 
https://issues.apache.org/jira/browse/STORM-2077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16211085#comment-16211085
 ] 

Andre Piwoni commented on STORM-2077:
-------------------------------------

I think this solution provides alternative way to set offset but there's 
nothing wrong with seekToEnd because it probably worked as designed and here's 
why. seekToEnd uses OffsetResetStrategy.LATEST to set end offset and what this 
means is that if there's an offset maintained for the consumer partition by 
Kafka (Zookeeper or special Kafka topic called consumer_offsets) then seekToEnd 
goes to latest committed offset; otherwise when there's no existing consumer 
offset it goes to the end of partition. There are many ways that Kafka can 
loose tracking of consumer offsets but you can verify it by setting expire 
offset parameter to some ridiculously short period. This is probably what 
happened.
The alternative solution also works because it sets offset explicity and I hope 
out of range cases are handled.


> KafkaSpout doesn't retry failed tuples
> --------------------------------------
>
>                 Key: STORM-2077
>                 URL: https://issues.apache.org/jira/browse/STORM-2077
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.0.2
>            Reporter: Tobias Maier
>
> KafkaSpout does not retry all failed tuples.
> We used following Configuration:
>         Map<String, Object> props = new HashMap<>();
>         props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "c1");
>         props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
> ByteArrayDeserializer.class.getName());
>         props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
> ByteArrayDeserializer.class.getName());
>         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> broker.bootstrapServer());
>         KafkaSpoutStreams kafkaSpoutStreams = new 
> KafkaSpoutStreams.Builder(FIELDS_KAFKA_EVENT, new 
> String[]{"test-topic"}).build();
>         KafkaSpoutTuplesBuilder<byte[], byte[]> kafkaSpoutTuplesBuilder = new 
> KafkaSpoutTuplesBuilder.Builder<>(new 
> KeyValueKafkaSpoutTupleBuilder("test-topic")).build();
>         KafkaSpoutRetryService retryService = new 
> KafkaSpoutLoggedRetryExponentialBackoff(KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1),
>  KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), 3, 
> KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.seconds(1));
>         KafkaSpoutConfig<byte[], byte[]> config = new 
> KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, kafkaSpoutTuplesBuilder, 
> retryService)
>                 .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
>                 .setMaxUncommittedOffsets(30)
>                 .setOffsetCommitPeriodMs(10)
>                 .setMaxRetries(3)
>                 .build();
> kafkaSpout = new org.apache.storm.kafka.spout.KafkaSpout<>(config);
> The downstream bolt fails every tuple and we expect, that those tuple will 
> all be replayed. But that's not the case for every tuple.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to