Repository: storm Updated Branches: refs/heads/master fca7c7673 -> 94b96087b
exponential backoff for failed messages Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cac70e77 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cac70e77 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cac70e77 Branch: refs/heads/master Commit: cac70e77d3417e13082495aaac126f25a6d07eff Parents: 4da836f Author: Rick Kilgore <[email protected]> Authored: Fri Aug 22 21:49:08 2014 -0700 Committer: Rick Kilgore <[email protected]> Committed: Fri Aug 22 21:49:08 2014 -0700 ---------------------------------------------------------------------- .../src/jvm/storm/kafka/PartitionManager.java | 66 ++++++++++++++++++-- .../src/jvm/storm/kafka/SpoutConfig.java | 3 + 2 files changed, 65 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/cac70e77/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index 9b48678..e80b19a 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -44,7 +44,8 @@ public class PartitionManager { private final CountMetric _fetchAPIMessageCount; Long _emittedToOffset; SortedSet<Long> _pending = new TreeSet<Long>(); - SortedSet<Long> failed = new TreeSet<Long>(); + private SortedSet<Long> failed = new TreeSet<Long>(); + private Map<Long,MessageRetryRecord> retryRecords = new HashMap<Long,MessageRetryRecord>(); Long _committedTo; LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>(); Partition _partition; @@ -144,14 +145,30 @@ public class PartitionManager { } } + /** + * Fetch the failed messages ready for retry. If there are no failed messages, or none are ready for retry, then it + * returns an empty List (i.e., not null). + */ + private SortedSet<Long> failedMsgsReadyForRetry() { + SortedSet<Long> ready = new TreeSet<Long>(); + for (Long offset : this.failed) { + if (this.retryRecords.get(offset).isReadyForRetry()) { + ready.add(offset); + } + } + return ready; + } + + private void fill() { long start = System.nanoTime(); long offset; - final boolean had_failed = !failed.isEmpty(); + final SortedSet<Long> failedReady = failedMsgsReadyForRetry(); // Are there failed tuples? If so, fetch those first. + final boolean had_failed = !failedReady.isEmpty(); if (had_failed) { - offset = failed.first(); + offset = failedReady.first(); } else { offset = _emittedToOffset; } @@ -171,7 +188,7 @@ public class PartitionManager { // Skip any old offsets. continue; } - if (!had_failed || failed.contains(cur_offset)) { + if (!had_failed || failedReady.contains(cur_offset)) { numMessages += 1; _pending.add(cur_offset); _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset)); @@ -191,6 +208,7 @@ public class PartitionManager { _pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear(); } _pending.remove(offset); + retryRecords.remove(offset); numberAcked++; } @@ -204,6 +222,8 @@ public class PartitionManager { } else { LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset); failed.add(offset); + MessageRetryRecord retryRecord = retryRecords.get(offset); + retryRecords.put(offset, retryRecord == null ? new MessageRetryRecord() : retryRecord.retryAgainRecord()); numberFailed++; if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) { throw new RuntimeException("Too many tuple failures"); @@ -261,4 +281,42 @@ public class PartitionManager { this.offset = offset; } } + + /** + * A MessageRetryRecord holds the data of how many times a message has + * failed and been retried, and when the last failure occurred. It can + * determine whether it is ready to be retried by employing an exponential + * back-off calculation using config values stored in SpoutConfig: + * <ul> + * <li>retryInitialDelayMs - time to delay before the first retry</li> + * <li>retryDelayMultiplier - multiplier by which to increase the delay for each subsequent retry</li> + * <li>retryMaxDelayMs - maximum retry delay (once this delay time is reached, subsequent retries will + * delay for this amount of time every time) + * </li> + * </ul> + */ + class MessageRetryRecord { + private final long failTimeUTC; + private final int attemptsAlreadyPerformed; + + private MessageRetryRecord(int attemptsAlreadyPerformed) { + this.failTimeUTC = new Date().getTime(); + this.attemptsAlreadyPerformed = attemptsAlreadyPerformed; + } + + public MessageRetryRecord() { + this(1); + } + + public MessageRetryRecord retryAgainRecord() { + return new MessageRetryRecord(this.attemptsAlreadyPerformed + 1); + } + + public boolean isReadyForRetry() { + double delayMultiplier = Math.pow(_spoutConfig.retryDelayMultiplier, this.attemptsAlreadyPerformed - 1); + long delayThisRetryMs = (long) (_spoutConfig.retryInitialDelayMs * delayMultiplier); + delayThisRetryMs = Math.min(delayThisRetryMs, _spoutConfig.retryMaxDelayMs); + return new Date().getTime() - this.failTimeUTC > delayThisRetryMs; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/cac70e77/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java index 1b66026..d8ca7eb 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java +++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java @@ -27,6 +27,9 @@ public class SpoutConfig extends KafkaConfig implements Serializable { public String zkRoot = null; public String id = null; public long stateUpdateIntervalMs = 2000; + public long retryInitialDelayMs = 10 * 1000; + public double retryDelayMultiplier = 1.5; + public long retryMaxDelayMs = 5 * 60 * 1000; public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic);
