[
https://issues.apache.org/jira/browse/STORM-2229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias Klein updated STORM-2229:
----------------------------------
Description:
When the topology fails a tuple, it is never resent by the KafkaSpout. This can
easily be shown by constructing a small topology failing every tuple.
Apparent reason:
{code}
public class KafkaSpout<K, V> extends BaseRichSpout {
//...
private void doSeekRetriableTopicPartitions() {
final Set<TopicPartition> retriableTopicPartitions =
retryService.retriableTopicPartitions();
for (TopicPartition rtp : retriableTopicPartitions) {
final OffsetAndMetadata offsetAndMeta =
acked.get(rtp).findNextCommitOffset();
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek
to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seekToEnd(toArrayList(rtp)); // Seek to last
committed offset <== Does seek to end of partition
}
}
}
{code}
The code seeks to the end of the partition instead of seeking to the first
uncommited offset.
Preliminary fix (worked for me, but needs to be checked by an expert)
{code}
private void doSeekRetriableTopicPartitions() {
final Set<TopicPartition> retriableTopicPartitions =
retryService.retriableTopicPartitions();
for (TopicPartition rtp : retriableTopicPartitions) {
final OffsetAndMetadata offsetAndMeta =
acked.get(rtp).findNextCommitOffset();
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek
to the next offset that is ready to commit in next commit cycle
} else {
OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
if(committed == null) {
// No offsets commited yet for this partition - start from
beginning
kafkaConsumer.seekToBeginning(toArrayList(rtp));
} else {
// Seek to first uncommitted offset
kafkaConsumer.seek(rtp, committed.offset() + 1);
}
}
}
}
{code}
was:
When the topology fails a tuple, it is never resent by the KafkaSpout. This can
easily be shown by constructing a small topology failing every tuple.
Apparent reason:
{code}
public class KafkaSpout<K, V> extends BaseRichSpout {
//...
private void doSeekRetriableTopicPartitions() {
final Set<TopicPartition> retriableTopicPartitions =
retryService.retriableTopicPartitions();
for (TopicPartition rtp : retriableTopicPartitions) {
final OffsetAndMetadata offsetAndMeta =
acked.get(rtp).findNextCommitOffset();
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek
to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1);
// Seek to last committed offset <== Does seek to end of partition
}
}
}
{code}
The code seeks to the end of the partition instead of seeking to the first
uncommited offset.
Preliminary fix (worked for me, but needs to be checked by an expert)
{code}
private void doSeekRetriableTopicPartitions() {
final Set<TopicPartition> retriableTopicPartitions =
retryService.retriableTopicPartitions();
for (TopicPartition rtp : retriableTopicPartitions) {
final OffsetAndMetadata offsetAndMeta =
acked.get(rtp).findNextCommitOffset();
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek
to the next offset that is ready to commit in next commit cycle
} else {
OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
if(committed == null) {
// No offsets commited yet for this partition - start from
beginning
kafkaConsumer.seekToBeginning(toArrayList(rtp));
} else {
// Seek to first uncommitted offset
kafkaConsumer.seek(rtp, committed.offset() + 1);
}
}
}
}
{code}
> KafkaSpout does not resend failed tuples
> ----------------------------------------
>
> Key: STORM-2229
> URL: https://issues.apache.org/jira/browse/STORM-2229
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 1.0.0, 1.0.1, 1.0.2
> Reporter: Matthias Klein
>
> When the topology fails a tuple, it is never resent by the KafkaSpout. This
> can easily be shown by constructing a small topology failing every tuple.
> Apparent reason:
> {code}
> public class KafkaSpout<K, V> extends BaseRichSpout {
> //...
> private void doSeekRetriableTopicPartitions() {
> final Set<TopicPartition> retriableTopicPartitions =
> retryService.retriableTopicPartitions();
> for (TopicPartition rtp : retriableTopicPartitions) {
> final OffsetAndMetadata offsetAndMeta =
> acked.get(rtp).findNextCommitOffset();
> if (offsetAndMeta != null) {
> kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek
> to the next offset that is ready to commit in next commit cycle
> } else {
> kafkaConsumer.seekToEnd(toArrayList(rtp)); // Seek to last
> committed offset <== Does seek to end of partition
> }
> }
> }
> {code}
> The code seeks to the end of the partition instead of seeking to the first
> uncommited offset.
> Preliminary fix (worked for me, but needs to be checked by an expert)
> {code}
> private void doSeekRetriableTopicPartitions() {
> final Set<TopicPartition> retriableTopicPartitions =
> retryService.retriableTopicPartitions();
> for (TopicPartition rtp : retriableTopicPartitions) {
> final OffsetAndMetadata offsetAndMeta =
> acked.get(rtp).findNextCommitOffset();
> if (offsetAndMeta != null) {
> kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek
> to the next offset that is ready to commit in next commit cycle
> } else {
> OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
> if(committed == null) {
> // No offsets commited yet for this partition - start
> from beginning
> kafkaConsumer.seekToBeginning(toArrayList(rtp));
> } else {
> // Seek to first uncommitted offset
> kafkaConsumer.seek(rtp, committed.offset() + 1);
> }
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)