[
https://issues.apache.org/jira/browse/STORM-2077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15712731#comment-15712731
]
P. Taylor Goetz commented on STORM-2077:
----------------------------------------
Quoting [~db3f] from STORM-2229 (provides additional detail on a proposed fix):
When the topology fails a tuple, it is never resent by the KafkaSpout. This can
easily be shown by constructing a small topology failing every tuple.
Apparent reason:
{code}
public class KafkaSpout<K, V> extends BaseRichSpout {
//...
private void doSeekRetriableTopicPartitions() {
final Set<TopicPartition> retriableTopicPartitions =
retryService.retriableTopicPartitions();
for (TopicPartition rtp : retriableTopicPartitions) {
final OffsetAndMetadata offsetAndMeta =
acked.get(rtp).findNextCommitOffset();
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek
to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seekToEnd(toArrayList(rtp)); // Seek to last
committed offset <== Does seek to end of partition
}
}
}
{code}
The code seeks to the end of the partition instead of seeking to the first
uncommited offset.
Preliminary fix (worked for me, but needs to be checked by an expert)
{code}
private void doSeekRetriableTopicPartitions() {
final Set<TopicPartition> retriableTopicPartitions =
retryService.retriableTopicPartitions();
for (TopicPartition rtp : retriableTopicPartitions) {
final OffsetAndMetadata offsetAndMeta =
acked.get(rtp).findNextCommitOffset();
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek
to the next offset that is ready to commit in next commit cycle
} else {
OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
if(committed == null) {
// No offsets commited yet for this partition - start from
beginning
kafkaConsumer.seekToBeginning(toArrayList(rtp));
} else {
// Seek to first uncommitted offset
kafkaConsumer.seek(rtp, committed.offset() + 1);
}
}
}
}
{code}
> KafkaSpout doesn't retry failed tuples
> --------------------------------------
>
> Key: STORM-2077
> URL: https://issues.apache.org/jira/browse/STORM-2077
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 1.0.2
> Reporter: Tobias Maier
>
> KafkaSpout does not retry all failed tuples.
> We used following Configuration:
> Map<String, Object> props = new HashMap<>();
> props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "c1");
> props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,
> ByteArrayDeserializer.class.getName());
> props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,
> ByteArrayDeserializer.class.getName());
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
> broker.bootstrapServer());
> KafkaSpoutStreams kafkaSpoutStreams = new
> KafkaSpoutStreams.Builder(FIELDS_KAFKA_EVENT, new
> String[]{"test-topic"}).build();
> KafkaSpoutTuplesBuilder<byte[], byte[]> kafkaSpoutTuplesBuilder = new
> KafkaSpoutTuplesBuilder.Builder<>(new
> KeyValueKafkaSpoutTupleBuilder("test-topic")).build();
> KafkaSpoutRetryService retryService = new
> KafkaSpoutLoggedRetryExponentialBackoff(KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1),
> KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), 3,
> KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.seconds(1));
> KafkaSpoutConfig<byte[], byte[]> config = new
> KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, kafkaSpoutTuplesBuilder,
> retryService)
> .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
> .setMaxUncommittedOffsets(30)
> .setOffsetCommitPeriodMs(10)
> .setMaxRetries(3)
> .build();
> kafkaSpout = new org.apache.storm.kafka.spout.KafkaSpout<>(config);
> The downstream bolt fails every tuple and we expect, that those tuple will
> all be replayed. But that's not the case for every tuple.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)