Kishore Senji created STORM-2352:
------------------------------------
Summary: New Kafka spout retries for ever even with retries of 5
Key: STORM-2352
URL: https://issues.apache.org/jira/browse/STORM-2352
Project: Apache Storm
Issue Type: Bug
Components: storm-kafka-client
Affects Versions: 1.0.0, 1.1.0
Reporter: Kishore Senji
v1.0.0 and above
KafkaSpout is created with a KafkaSpoutConfig having maxRetries of 5. Still the
KafkaSpout retries the failed Tuple forever.
Reason:
The numFails are incremented in fail() method of KafkaSpout.
{code}
public void fail(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
emitted.remove(msgId);
if (msgId.numFails() < maxRetries) {
msgId.incrementNumFails();
retryService.schedule(msgId);
} else { // limit to max number of retries
LOG.debug("Reached maximum number of retries. Message [{}] being
marked as acked.", msgId);
ack(msgId);
}
}
{code}
However the emitTupleIfNotEmitted() creates a new KafkaSpoutMessageId and
checks if the msgId is ready to be emitted (in the case of failure) and if so
emits the new msgId instance (thus losing the numFails from the previous time)
{code}
private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(),
record.partition());
final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) { // has
been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping",
record);
} else if (emitted.contains(msgId)) { // has been emitted and it's
pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted.
Skipping", record);
} else if (!retryService.isScheduled(msgId) ||
retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never
emitted) or ready to be retried
final List<Object> tuple = tuplesBuilder.buildTuple(record);
kafkaSpoutStreams.emit(collector, tuple, msgId);
emitted.add(msgId);
numUncommittedOffsets++;
if (retryService.isReady(msgId)) { // has failed. Is it ready for
retry ?
retryService.remove(msgId); // re-emitted hence remove from
failed
}
LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
}
}
{code}
isReady() is not a side-effect. It just looks up and returns true. Fix is to
either modify the RetryService interface to convey back the msgId in the
RetryService or make the isReady() a side-effect to attach the numFails from
the previous time
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)