separate retry logic into manager class

separate retry logic into manager class ExponentialBackoffMsgRetryManagerTest
fixed logic with regards to TOPOLOGY_MESSAGE_TIMEOUT_SECS (this is not a max
    time across retries, but rather a max time within a retry - so there is
    no conflict and no need to account for it in the retry logic).
tests for ExponentialBackoffMsgRetryManagerTest


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/02bffc60
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/02bffc60
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/02bffc60

Branch: refs/heads/master
Commit: 02bffc601e71d89948a7c5c214d1ae69ccc8ef1d
Parents: 882dfc5
Author: Rick Kilgore <[email protected]>
Authored: Fri Nov 21 18:00:51 2014 -0800
Committer: Rick Kilgore <[email protected]>
Committed: Fri Nov 21 18:00:51 2014 -0800

----------------------------------------------------------------------
 external/storm-kafka/pom.xml                    |  21 +-
 .../ExponentialBackoffMsgRetryManager.java      | 154 +++++++++++++++
 .../jvm/storm/kafka/FailedMsgRetryManager.java  |  26 +++
 .../src/jvm/storm/kafka/PartitionManager.java   | 146 ++------------
 .../src/jvm/storm/kafka/SpoutConfig.java        |   3 -
 .../ExponentialBackoffMsgRetryManagerTest.java  | 194 +++++++++++++++++++
 6 files changed, 408 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/02bffc60/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 05bb117..f17af74 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -31,7 +31,26 @@
     <description>Storm Spouts for Apache Kafka</description>
     <build>
         <plugins>
-
+            <plugin>
+                <groupId>org.jacoco</groupId>
+                <artifactId>jacoco-maven-plugin</artifactId>
+                <version>0.7.2.201409121644</version>
+                <executions>
+                    <execution>
+                        <id>jacoco-initialize</id>
+                        <goals>
+                            <goal>prepare-agent</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>jacoco-report</id>
+                        <phase>test</phase>
+                        <goals>
+                            <goal>report</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
         </plugins>
         <sourceDirectory>src/jvm</sourceDirectory>
         <testSourceDirectory>src/test</testSourceDirectory>

http://git-wip-us.apache.org/repos/asf/storm/blob/02bffc60/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
 
b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
new file mode 100644
index 0000000..cace9cd
--- /dev/null
+++ 
b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ExponentialBackoffMsgRetryManager implements 
FailedMsgRetryManager {
+
+    private final long retryInitialDelayMs;
+    private final double retryDelayMultiplier;
+    private final long retryDelayMaxMs;
+
+    private Queue<MessageRetryRecord> waiting = new 
PriorityQueue<MessageRetryRecord>();
+    private SortedMap<Long,MessageRetryRecord> records = new 
TreeMap<Long,MessageRetryRecord>();
+
+    public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double 
retryDelayMultiplier, long retryDelayMaxMs) {
+        this.retryInitialDelayMs = retryInitialDelayMs;
+        this.retryDelayMultiplier = retryDelayMultiplier;
+        this.retryDelayMaxMs = retryDelayMaxMs;
+    }
+
+    @Override
+    public void failed(Long offset) {
+        MessageRetryRecord oldRecord = this.records.get(offset);
+        MessageRetryRecord newRecord = oldRecord == null ?
+                                       new MessageRetryRecord(offset) :
+                                       oldRecord.createNextRetryRecord();
+        this.records.put(offset, newRecord);
+        this.waiting.add(newRecord);
+    }
+
+    @Override
+    public void acked(Long offset) {
+        MessageRetryRecord record = this.records.remove(offset);
+        if (record != null) {
+            this.waiting.remove(record);
+        }
+    }
+
+    @Override
+    public void retryStarted(Long offset) {
+        MessageRetryRecord record = this.records.get(offset);
+        if (record == null || !this.waiting.contains(record)) {
+            throw new IllegalStateException("cannot retry a message that has 
not failed");
+        } else {
+            this.waiting.remove(record);
+        }
+    }
+
+    @Override
+    public Long nextFailedMessageToRetry() {
+        if (this.waiting.size() > 0) {
+            MessageRetryRecord first = this.waiting.peek();
+            if (System.currentTimeMillis() >= first.retryTimeUTC) {
+                if (this.records.containsKey(first.offset)) {
+                    return first.offset;
+                } else {
+                    // defensive programming - should be impossible
+                    this.waiting.remove(first);
+                    return nextFailedMessageToRetry();
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public boolean shouldRetryMsg(Long offset) {
+        MessageRetryRecord record = this.records.get(offset);
+        return record != null &&
+                this.waiting.contains(record) &&
+                System.currentTimeMillis() >= record.retryTimeUTC;
+    }
+
+    /**
+     * A MessageRetryRecord holds the data of how many times a message has
+     * failed and been retried, and when the last failure occurred.  It can
+     * determine whether it is ready to be retried by employing an exponential
+     * back-off calculation using config values stored in SpoutConfig:
+     * <ul>
+     *  <li>retryInitialDelayMs - time to delay before the first retry</li>
+     *  <li>retryDelayMultiplier - multiplier by which to increase the delay 
for each subsequent retry</li>
+     *  <li>retryDelayMaxMs - maximum retry delay (once this delay time is 
reached, subsequent retries will
+     *                        delay for this amount of time every time)
+     *  </li>
+     * </ul>
+     */
+    class MessageRetryRecord implements Comparable<MessageRetryRecord> {
+        private final long offset;
+        private final int retryNum;
+        private final long retryTimeUTC;
+
+        public MessageRetryRecord(long offset) {
+            this(offset, 1);
+        }
+
+        private MessageRetryRecord(long offset, int retryNum) {
+            this.offset = offset;
+            this.retryNum = retryNum;
+            this.retryTimeUTC = System.currentTimeMillis() + 
calculateRetryDelay();
+        }
+
+        /**
+         * Create a MessageRetryRecord for the next retry that should occur 
after this one.
+         * @return MessageRetryRecord with the next retry time, or null to 
indicate that another
+         *         retry should not be performed.  The latter case can happen 
if we are about to
+         *         run into the 
backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm
+         *         configuration.
+         */
+        public MessageRetryRecord createNextRetryRecord() {
+            return new MessageRetryRecord(this.offset, this.retryNum + 1);
+        }
+
+        private long calculateRetryDelay() {
+            double delayMultiplier = Math.pow(retryDelayMultiplier, 
this.retryNum - 1);
+            long delayThisRetryMs = (long) (retryInitialDelayMs * 
delayMultiplier);
+            return Math.min(delayThisRetryMs, retryDelayMaxMs);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            return (other instanceof MessageRetryRecord
+                    && this.offset == ((MessageRetryRecord) other).offset);
+        }
+
+        @Override
+        public int hashCode() {
+            return Long.valueOf(this.offset).hashCode();
+        }
+
+        @Override
+        public int compareTo(MessageRetryRecord other) {
+            return Long.compare(this.retryTimeUTC, other.retryTimeUTC);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/02bffc60/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java 
b/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
new file mode 100644
index 0000000..3f0e117
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+public interface FailedMsgRetryManager {
+    public void failed(Long offset);
+    public void acked(Long offset);
+    public void retryStarted(Long offset);
+    public Long nextFailedMessageToRetry();
+    public boolean shouldRetryMsg(Long offset);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/02bffc60/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java 
b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index cff6df0..17dd72d 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -23,7 +23,6 @@ import backtype.storm.metric.api.CountMetric;
 import backtype.storm.metric.api.MeanReducer;
 import backtype.storm.metric.api.ReducedMetric;
 import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.utils.Utils;
 import com.google.common.collect.ImmutableMap;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
@@ -38,13 +37,6 @@ import java.util.*;
 
 public class PartitionManager {
     public static final Logger LOG = 
LoggerFactory.getLogger(PartitionManager.class);
-    private static final String TIMES_UP_MSG =
-            "Retry logic in your topology is taking longer to complete than is 
allowed by your"
-            +" Storm Config setting TOPOLOGY_MESSAGE_TIMEOUT_SECS (%s 
seconds).  (i.e., you have"
-            +" called OutputCollector.fail() too many times for this message). 
 KafkaSpout has"
-            +" aborted next retry attempt (retry %s) for the Kafka message at 
offset %s since it"
-            +" would occur after this timeout.";
-    private static final long TIMEOUT_RESET_VALUE = -1L;
 
     private final CombinedMetric _fetchAPILatencyMax;
     private final ReducedMetric _fetchAPILatencyMean;
@@ -53,10 +45,9 @@ public class PartitionManager {
     Long _emittedToOffset;
     // _pending key = Kafka offset, value = time at which the message was 
first submitted to the topology
     private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
-    private SortedSet<Long> failed = new TreeSet<Long>();
+    private final FailedMsgRetryManager _failedMsgRetryManager;
 
     // retryRecords key = Kafka offset, value = retry info for the given 
message
-    private Map<Long,MessageRetryRecord> retryRecords = new 
HashMap<Long,MessageRetryRecord>();
     Long _committedTo;
     LinkedList<MessageAndRealOffset> _waitingToEmit = new 
LinkedList<MessageAndRealOffset>();
     Partition _partition;
@@ -77,6 +68,10 @@ public class PartitionManager {
         _stormConf = stormConf;
         numberAcked = numberFailed = 0;
 
+        _failedMsgRetryManager = new 
ExponentialBackoffMsgRetryManager(_spoutConfig.retryInitialDelayMs,
+                                                                           
_spoutConfig.retryDelayMultiplier,
+                                                                           
_spoutConfig.retryDelayMaxMs);
+
         String jsonTopologyId = null;
         Long jsonOffset = null;
         String path = committedPath();
@@ -156,31 +151,15 @@ public class PartitionManager {
         }
     }
 
-    /**
-     * Fetch the failed messages ready for retry.  If there are no failed 
messages, or none are ready for retry, then it
-     * returns an empty List (i.e., not null).
-     */
-    private SortedSet<Long> failedMsgsReadyForRetry() {
-        SortedSet<Long> ready = new TreeSet<Long>();
-        for (Long offset : this.failed) {
-            if (this.retryRecords.get(offset).isReadyForRetry()) {
-                ready.add(offset);
-            }
-        }
-        return ready;
-    }
-
 
     private void fill() {
         long start = System.nanoTime();
-        long offset;
-        final SortedSet<Long> failedReady = failedMsgsReadyForRetry();
+        Long offset;
 
         // Are there failed tuples? If so, fetch those first.
-        final boolean had_failed = !failedReady.isEmpty();
-        if (had_failed) {
-            offset = failedReady.first();
-        } else {
+        offset = this._failedMsgRetryManager.nextFailedMessageToRetry();
+        final boolean processingNewTuples = (offset == null);
+        if (processingNewTuples) {
             offset = _emittedToOffset;
         }
 
@@ -199,15 +178,15 @@ public class PartitionManager {
                     // Skip any old offsets.
                     continue;
                 }
-                if (!had_failed || failedReady.contains(cur_offset)) {
+                if (processingNewTuples || 
this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
                     numMessages += 1;
                     if (!_pending.containsKey(cur_offset)) {
                         _pending.put(cur_offset, System.currentTimeMillis());
                     }
                     _waitingToEmit.add(new MessageAndRealOffset(msg.message(), 
cur_offset));
                     _emittedToOffset = Math.max(msg.nextOffset(), 
_emittedToOffset);
-                    if (had_failed) {
-                        failed.remove(cur_offset);
+                    if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
+                        this._failedMsgRetryManager.retryStarted(cur_offset);
                     }
                 }
             }
@@ -221,7 +200,7 @@ public class PartitionManager {
             _pending.headMap(offset - _spoutConfig.maxOffsetBehind).clear();
         }
         _pending.remove(offset);
-        retryRecords.remove(offset);
+        this._failedMsgRetryManager.acked(offset);
         numberAcked++;
     }
 
@@ -239,18 +218,7 @@ public class PartitionManager {
                 throw new RuntimeException("Too many tuple failures");
             }
 
-            try {
-                MessageRetryRecord retryRecord = retryRecords.get(offset);
-                retryRecord = retryRecord == null
-                              ? new MessageRetryRecord(offset)
-                              : retryRecord.createNextRetryRecord();
-
-                retryRecords.put(offset, retryRecord);
-                failed.add(offset);
-
-            } catch (MessageRetryRecord.AvailableRetryTimeExceededException e) 
{
-                LOG.error("cannot retry", e);
-            }
+            this._failedMsgRetryManager.failed(offset);
         }
     }
 
@@ -304,90 +272,4 @@ public class PartitionManager {
             this.offset = offset;
         }
     }
-
-    /**
-     * A MessageRetryRecord holds the data of how many times a message has
-     * failed and been retried, and when the last failure occurred.  It can
-     * determine whether it is ready to be retried by employing an exponential
-     * back-off calculation using config values stored in SpoutConfig:
-     * <ul>
-     *  <li>retryInitialDelayMs - time to delay before the first retry</li>
-     *  <li>retryDelayMultiplier - multiplier by which to increase the delay 
for each subsequent retry</li>
-     *  <li>retryDelayMaxMs - maximum retry delay (once this delay time is 
reached, subsequent retries will
-     *                        delay for this amount of time every time)
-     *  </li>
-     * </ul>
-     */
-    class MessageRetryRecord {
-        private final long offset;
-        private final int retryNum;
-        private final long retryTimeUTC;
-
-        public MessageRetryRecord(long offset) throws 
AvailableRetryTimeExceededException {
-            this(offset, 1);
-        }
-
-        private MessageRetryRecord(long offset, int retryNum) throws 
AvailableRetryTimeExceededException {
-            this.offset = offset;
-            this.retryNum = retryNum;
-            this.retryTimeUTC = System.currentTimeMillis() + 
calculateRetryDelay();
-            validateRetryTime();
-        }
-
-        /**
-         * Create a MessageRetryRecord for the next retry that should occur 
after this one.
-         * @return MessageRetryRecord with the next retry time, or null to 
indicate that another
-         *         retry should not be performed.  The latter case can happen 
if we are about to
-         *         run into the 
backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm
-         *         configuration.
-         */
-        public MessageRetryRecord createNextRetryRecord() throws 
AvailableRetryTimeExceededException {
-            return new MessageRetryRecord(this.offset, this.retryNum + 1);
-        }
-
-        private void validateRetryTime() throws 
AvailableRetryTimeExceededException {
-            long stormStartTime = 
PartitionManager.this._pending.get(this.offset);
-
-            if (stormStartTime == TIMEOUT_RESET_VALUE) {
-                // This is a resubmission from the Storm framework after 
Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
-                // has elapsed.  Restart my timer.
-                PartitionManager.this._pending.put(this.offset, 
System.currentTimeMillis());
-
-            } else {
-                int timeoutSeconds = 
Utils.getInt(_stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
-                if (this.retryTimeUTC - stormStartTime > timeoutSeconds * 
1000) {
-
-                    // Prepare for when the Storm framework calls fail()
-                    _pending.put(this.offset, TIMEOUT_RESET_VALUE);
-
-                    throw new 
AvailableRetryTimeExceededException(String.format(TIMES_UP_MSG,
-                                                                               
 timeoutSeconds,
-                                                                               
 this.retryNum,
-                                                                               
 this.offset));
-
-                } else {
-                    LOG.warn(String.format("allowing another retry: start=%s, 
retryTime=%s, timeoutSeconds=%s",
-                                           (stormStartTime / 1000) % 1000,
-                                           (this.retryTimeUTC / 1000) % 1000,
-                                           timeoutSeconds));
-                }
-            }
-        }
-
-        private long calculateRetryDelay() {
-            double delayMultiplier = 
Math.pow(_spoutConfig.retryDelayMultiplier, this.retryNum - 1);
-            long delayThisRetryMs = (long) (_spoutConfig.retryInitialDelayMs * 
delayMultiplier);
-            return Math.min(delayThisRetryMs, _spoutConfig.retryDelayMaxMs);
-        }
-
-        public boolean isReadyForRetry() {
-            return System.currentTimeMillis() > this.retryTimeUTC;
-        }
-
-        class AvailableRetryTimeExceededException extends Exception {
-            public AvailableRetryTimeExceededException(String msg) {
-                super(msg);
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/02bffc60/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java 
b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
index 1c40ddf..61d0b35 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
@@ -32,9 +32,6 @@ public class SpoutConfig extends KafkaConfig implements 
Serializable {
 
     // Exponential back-off retry settings.  These are used when retrying 
messages after a bolt
     // calls OutputCollector.fail().
-    //
-    // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS 
appropriately to prevent
-    // resubmitting the message while still retrying.
     public long retryInitialDelayMs = 0;
     public double retryDelayMultiplier = 1.0;
     public long retryDelayMaxMs = 60 * 1000;

http://git-wip-us.apache.org/repos/asf/storm/blob/02bffc60/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
 
b/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
new file mode 100644
index 0000000..ef30163
--- /dev/null
+++ 
b/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -0,0 +1,194 @@
+package storm.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+public class ExponentialBackoffMsgRetryManagerTest {
+
+    private static final Long TEST_OFFSET = 101L;
+    private static final Long TEST_OFFSET2 = 102L;
+
+    @Test
+    public void testImmediateRetry() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        manager.failed(TEST_OFFSET);
+        Long next = manager.nextFailedMessageToRetry();
+        assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
+        assertTrue("message should be ready for retry immediately", 
manager.shouldRetryMsg(TEST_OFFSET));
+
+        manager.retryStarted(TEST_OFFSET);
+
+        manager.failed(TEST_OFFSET);
+        next = manager.nextFailedMessageToRetry();
+        assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
+        assertTrue("message should be ready for retry immediately", 
manager.shouldRetryMsg(TEST_OFFSET));
+    }
+
+    @Test
+    public void testSingleDelay() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(10, 1d, 100);
+        manager.failed(TEST_OFFSET);
+        Thread.sleep(5);
+        Long next = manager.nextFailedMessageToRetry();
+        assertNull("expect no message ready for retry yet", next);
+        assertFalse("message should not be ready for retry yet", 
manager.shouldRetryMsg(TEST_OFFSET));
+
+        Thread.sleep(10);
+        next = manager.nextFailedMessageToRetry();
+        assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
+        assertTrue("message should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
+    }
+
+    @Test
+    public void testExponentialBackoff() throws Exception {
+        final long initial = 10;
+        final double mult = 2d;
+        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(initial, mult, initial * 10);
+
+        long expectedWaitTime = initial;
+        for (long i = 0L; i < 3L; ++i) {
+            manager.failed(TEST_OFFSET);
+
+            Thread.sleep((expectedWaitTime + 1L) / 2L);
+            assertFalse("message should not be ready for retry yet", 
manager.shouldRetryMsg(TEST_OFFSET));
+
+            Thread.sleep((expectedWaitTime + 1L) / 2L);
+            Long next = manager.nextFailedMessageToRetry();
+            assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
+            assertTrue("message should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
+
+            manager.retryStarted(TEST_OFFSET);
+            expectedWaitTime *= mult;
+        }
+    }
+
+    @Test
+    public void testRetryOrder() throws Exception {
+        final long initial = 10;
+        final double mult = 2d;
+        final long max = 20;
+        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(initial, mult, max);
+
+        manager.failed(TEST_OFFSET);
+        Thread.sleep(initial);
+
+        manager.retryStarted(TEST_OFFSET);
+        manager.failed(TEST_OFFSET);
+        manager.failed(TEST_OFFSET2);
+
+        // although TEST_OFFSET failed first, it's retry delay time is longer 
b/c this is the second retry
+        // so TEST_OFFSET2 should come first
+
+        Thread.sleep(initial * 2);
+        assertTrue("message "+TEST_OFFSET+"should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
+        assertTrue("message "+TEST_OFFSET2+"should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET2));
+
+        Long next = manager.nextFailedMessageToRetry();
+        assertEquals("expect first message to retry is "+TEST_OFFSET2, 
TEST_OFFSET2, next);
+
+        Thread.sleep(initial);
+
+        // haven't retried yet, so first should still be TEST_OFFSET2
+        next = manager.nextFailedMessageToRetry();
+        assertEquals("expect first message to retry is "+TEST_OFFSET2, 
TEST_OFFSET2, next);
+        manager.retryStarted(next);
+
+        // now it should be TEST_OFFSET
+        next = manager.nextFailedMessageToRetry();
+        assertEquals("expect message to retry is now "+TEST_OFFSET, 
TEST_OFFSET, next);
+        manager.retryStarted(next);
+
+        // now none left
+        next = manager.nextFailedMessageToRetry();
+        assertNull("expect no message to retry now", next);
+    }
+
+    @Test
+    public void testQueriesAfterRetriedAlready() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        manager.failed(TEST_OFFSET);
+        Long next = manager.nextFailedMessageToRetry();
+        assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
+        assertTrue("message should be ready for retry immediately", 
manager.shouldRetryMsg(TEST_OFFSET));
+
+        manager.retryStarted(TEST_OFFSET);
+        next = manager.nextFailedMessageToRetry();
+        assertNull("expect no message ready after retried", next);
+        assertFalse("message should not be ready after retried", 
manager.shouldRetryMsg(TEST_OFFSET));
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testRetryWithoutFail() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        manager.retryStarted(TEST_OFFSET);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testFailRetryRetry() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        manager.failed(TEST_OFFSET);
+        try {
+            manager.retryStarted(TEST_OFFSET);
+        } catch (IllegalStateException ise) {
+            fail("IllegalStateException unexpected here: " + ise);
+        }
+
+        assertFalse("message should not be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
+        manager.retryStarted(TEST_OFFSET);
+    }
+
+    @Test
+    public void testMaxBackoff() throws Exception {
+        final long initial = 10;
+        final double mult = 2d;
+        final long max = 20;
+        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(initial, mult, max);
+
+        long expectedWaitTime = initial;
+        for (long i = 0L; i < 4L; ++i) {
+            manager.failed(TEST_OFFSET);
+
+            Thread.sleep((expectedWaitTime + 1L) / 2L);
+            assertFalse("message should not be ready for retry yet", 
manager.shouldRetryMsg(TEST_OFFSET));
+
+            Thread.sleep((expectedWaitTime + 1L) / 2L);
+            Long next = manager.nextFailedMessageToRetry();
+            assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
+            assertTrue("message should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
+
+            manager.retryStarted(TEST_OFFSET);
+            expectedWaitTime = Math.min((long) (expectedWaitTime * mult), max);
+        }
+    }
+
+    @Test
+    public void testFailThenAck() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        manager.failed(TEST_OFFSET);
+        assertTrue("message should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
+
+        manager.acked(TEST_OFFSET);
+
+        Long next = manager.nextFailedMessageToRetry();
+        assertNull("expect no message ready after acked", next);
+        assertFalse("message should not be ready after acked", 
manager.shouldRetryMsg(TEST_OFFSET));
+    }
+
+    @Test
+    public void testAckThenFail() throws Exception {
+        ExponentialBackoffMsgRetryManager manager = new 
ExponentialBackoffMsgRetryManager(0, 0d, 0);
+        manager.acked(TEST_OFFSET);
+        assertFalse("message should not be ready after acked", 
manager.shouldRetryMsg(TEST_OFFSET));
+
+        manager.failed(TEST_OFFSET);
+
+        Long next = manager.nextFailedMessageToRetry();
+        assertEquals("expect test offset next available for retry", 
TEST_OFFSET, next);
+        assertTrue("message should be ready for retry", 
manager.shouldRetryMsg(TEST_OFFSET));
+    }
+}
\ No newline at end of file

Reply via email to