Repository: storm
Updated Branches:
  refs/heads/master fca7c7673 -> 94b96087b


exponential backoff for failed messages


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cac70e77
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cac70e77
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cac70e77

Branch: refs/heads/master
Commit: cac70e77d3417e13082495aaac126f25a6d07eff
Parents: 4da836f
Author: Rick Kilgore <[email protected]>
Authored: Fri Aug 22 21:49:08 2014 -0700
Committer: Rick Kilgore <[email protected]>
Committed: Fri Aug 22 21:49:08 2014 -0700

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/PartitionManager.java   | 66 ++++++++++++++++++--
 .../src/jvm/storm/kafka/SpoutConfig.java        |  3 +
 2 files changed, 65 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cac70e77/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java 
b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 9b48678..e80b19a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -44,7 +44,8 @@ public class PartitionManager {
     private final CountMetric _fetchAPIMessageCount;
     Long _emittedToOffset;
     SortedSet<Long> _pending = new TreeSet<Long>();
-    SortedSet<Long> failed = new TreeSet<Long>();
+    private SortedSet<Long> failed = new TreeSet<Long>();
+    private Map<Long,MessageRetryRecord> retryRecords = new 
HashMap<Long,MessageRetryRecord>();
     Long _committedTo;
     LinkedList<MessageAndRealOffset> _waitingToEmit = new 
LinkedList<MessageAndRealOffset>();
     Partition _partition;
@@ -144,14 +145,30 @@ public class PartitionManager {
         }
     }
 
+    /**
+     * Fetch the failed messages ready for retry.  If there are no failed 
messages, or none are ready for retry, then it
+     * returns an empty List (i.e., not null).
+     */
+    private SortedSet<Long> failedMsgsReadyForRetry() {
+        SortedSet<Long> ready = new TreeSet<Long>();
+        for (Long offset : this.failed) {
+            if (this.retryRecords.get(offset).isReadyForRetry()) {
+                ready.add(offset);
+            }
+        }
+        return ready;
+    }
+
+
     private void fill() {
         long start = System.nanoTime();
         long offset;
-        final boolean had_failed = !failed.isEmpty();
+        final SortedSet<Long> failedReady = failedMsgsReadyForRetry();
 
         // Are there failed tuples? If so, fetch those first.
+        final boolean had_failed = !failedReady.isEmpty();
         if (had_failed) {
-            offset = failed.first();
+            offset = failedReady.first();
         } else {
             offset = _emittedToOffset;
         }
@@ -171,7 +188,7 @@ public class PartitionManager {
                     // Skip any old offsets.
                     continue;
                 }
-                if (!had_failed || failed.contains(cur_offset)) {
+                if (!had_failed || failedReady.contains(cur_offset)) {
                     numMessages += 1;
                     _pending.add(cur_offset);
                     _waitingToEmit.add(new MessageAndRealOffset(msg.message(), 
cur_offset));
@@ -191,6 +208,7 @@ public class PartitionManager {
             _pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear();
         }
         _pending.remove(offset);
+        retryRecords.remove(offset);
         numberAcked++;
     }
 
@@ -204,6 +222,8 @@ public class PartitionManager {
         } else {
             LOG.debug("failing at offset=" + offset + " with _pending.size()=" 
+ _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset);
             failed.add(offset);
+            MessageRetryRecord retryRecord = retryRecords.get(offset);
+            retryRecords.put(offset, retryRecord == null ? new 
MessageRetryRecord() : retryRecord.retryAgainRecord());
             numberFailed++;
             if (numberAcked == 0 && numberFailed > 
_spoutConfig.maxOffsetBehind) {
                 throw new RuntimeException("Too many tuple failures");
@@ -261,4 +281,42 @@ public class PartitionManager {
             this.offset = offset;
         }
     }
+
+    /**
+     * A MessageRetryRecord holds the data of how many times a message has
+     * failed and been retried, and when the last failure occurred.  It can
+     * determine whether it is ready to be retried by employing an exponential
+     * back-off calculation using config values stored in SpoutConfig:
+     * <ul>
+     *  <li>retryInitialDelayMs - time to delay before the first retry</li>
+     *  <li>retryDelayMultiplier - multiplier by which to increase the delay 
for each subsequent retry</li>
+     *  <li>retryMaxDelayMs - maximum retry delay (once this delay time is 
reached, subsequent retries will
+     *                        delay for this amount of time every time)
+     *  </li>
+     * </ul>
+     */
+    class MessageRetryRecord {
+        private final long failTimeUTC;
+        private final int attemptsAlreadyPerformed;
+
+        private MessageRetryRecord(int attemptsAlreadyPerformed) {
+            this.failTimeUTC = new Date().getTime();
+            this.attemptsAlreadyPerformed = attemptsAlreadyPerformed;
+        }
+
+        public MessageRetryRecord() {
+            this(1);
+        }
+
+        public MessageRetryRecord retryAgainRecord() {
+            return new MessageRetryRecord(this.attemptsAlreadyPerformed + 1);
+        }
+
+        public boolean isReadyForRetry() {
+            double delayMultiplier = 
Math.pow(_spoutConfig.retryDelayMultiplier, this.attemptsAlreadyPerformed - 1);
+            long delayThisRetryMs = (long) (_spoutConfig.retryInitialDelayMs * 
delayMultiplier);
+            delayThisRetryMs = Math.min(delayThisRetryMs, 
_spoutConfig.retryMaxDelayMs);
+            return new Date().getTime() - this.failTimeUTC > delayThisRetryMs;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/cac70e77/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java 
b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
index 1b66026..d8ca7eb 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
@@ -27,6 +27,9 @@ public class SpoutConfig extends KafkaConfig implements 
Serializable {
     public String zkRoot = null;
     public String id = null;
     public long stateUpdateIntervalMs = 2000;
+    public long retryInitialDelayMs = 10 * 1000;
+    public double retryDelayMultiplier = 1.5;
+    public long retryMaxDelayMs = 5 * 60 * 1000;
 
     public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String 
id) {
         super(hosts, topic);

Reply via email to