separate retry logic into manager class
separate retry logic into manager class ExponentialBackoffMsgRetryManagerTest
fixed logic with regards to TOPOLOGY_MESSAGE_TIMEOUT_SECS (this is not a max
time across retries, but rather a max time within a retry - so there is
no conflict and no need to account for it in the retry logic).
tests for ExponentialBackoffMsgRetryManagerTest
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/02bffc60
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/02bffc60
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/02bffc60
Branch: refs/heads/master
Commit: 02bffc601e71d89948a7c5c214d1ae69ccc8ef1d
Parents: 882dfc5
Author: Rick Kilgore <[email protected]>
Authored: Fri Nov 21 18:00:51 2014 -0800
Committer: Rick Kilgore <[email protected]>
Committed: Fri Nov 21 18:00:51 2014 -0800
----------------------------------------------------------------------
external/storm-kafka/pom.xml | 21 +-
.../ExponentialBackoffMsgRetryManager.java | 154 +++++++++++++++
.../jvm/storm/kafka/FailedMsgRetryManager.java | 26 +++
.../src/jvm/storm/kafka/PartitionManager.java | 146 ++------------
.../src/jvm/storm/kafka/SpoutConfig.java | 3 -
.../ExponentialBackoffMsgRetryManagerTest.java | 194 +++++++++++++++++++
6 files changed, 408 insertions(+), 136 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/02bffc60/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 05bb117..f17af74 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -31,7 +31,26 @@
<description>Storm Spouts for Apache Kafka</description>
<build>
<plugins>
-
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ <version>0.7.2.201409121644</version>
+ <executions>
+ <execution>
+ <id>jacoco-initialize</id>
+ <goals>
+ <goal>prepare-agent</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>jacoco-report</id>
+ <phase>test</phase>
+ <goals>
+ <goal>report</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
<sourceDirectory>src/jvm</sourceDirectory>
<testSourceDirectory>src/test</testSourceDirectory>
http://git-wip-us.apache.org/repos/asf/storm/blob/02bffc60/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git
a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
new file mode 100644
index 0000000..cace9cd
--- /dev/null
+++
b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+public class ExponentialBackoffMsgRetryManager implements
FailedMsgRetryManager {
+
+ private final long retryInitialDelayMs;
+ private final double retryDelayMultiplier;
+ private final long retryDelayMaxMs;
+
+ private Queue<MessageRetryRecord> waiting = new
PriorityQueue<MessageRetryRecord>();
+ private SortedMap<Long,MessageRetryRecord> records = new
TreeMap<Long,MessageRetryRecord>();
+
+ public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double
retryDelayMultiplier, long retryDelayMaxMs) {
+ this.retryInitialDelayMs = retryInitialDelayMs;
+ this.retryDelayMultiplier = retryDelayMultiplier;
+ this.retryDelayMaxMs = retryDelayMaxMs;
+ }
+
+ @Override
+ public void failed(Long offset) {
+ MessageRetryRecord oldRecord = this.records.get(offset);
+ MessageRetryRecord newRecord = oldRecord == null ?
+ new MessageRetryRecord(offset) :
+ oldRecord.createNextRetryRecord();
+ this.records.put(offset, newRecord);
+ this.waiting.add(newRecord);
+ }
+
+ @Override
+ public void acked(Long offset) {
+ MessageRetryRecord record = this.records.remove(offset);
+ if (record != null) {
+ this.waiting.remove(record);
+ }
+ }
+
+ @Override
+ public void retryStarted(Long offset) {
+ MessageRetryRecord record = this.records.get(offset);
+ if (record == null || !this.waiting.contains(record)) {
+ throw new IllegalStateException("cannot retry a message that has
not failed");
+ } else {
+ this.waiting.remove(record);
+ }
+ }
+
+ @Override
+ public Long nextFailedMessageToRetry() {
+ if (this.waiting.size() > 0) {
+ MessageRetryRecord first = this.waiting.peek();
+ if (System.currentTimeMillis() >= first.retryTimeUTC) {
+ if (this.records.containsKey(first.offset)) {
+ return first.offset;
+ } else {
+ // defensive programming - should be impossible
+ this.waiting.remove(first);
+ return nextFailedMessageToRetry();
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean shouldRetryMsg(Long offset) {
+ MessageRetryRecord record = this.records.get(offset);
+ return record != null &&
+ this.waiting.contains(record) &&
+ System.currentTimeMillis() >= record.retryTimeUTC;
+ }
+
+ /**
+ * A MessageRetryRecord holds the data of how many times a message has
+ * failed and been retried, and when the last failure occurred. It can
+ * determine whether it is ready to be retried by employing an exponential
+ * back-off calculation using config values stored in SpoutConfig:
+ * <ul>
+ * <li>retryInitialDelayMs - time to delay before the first retry</li>
+ * <li>retryDelayMultiplier - multiplier by which to increase the delay
for each subsequent retry</li>
+ * <li>retryDelayMaxMs - maximum retry delay (once this delay time is
reached, subsequent retries will
+ * delay for this amount of time every time)
+ * </li>
+ * </ul>
+ */
+ class MessageRetryRecord implements Comparable<MessageRetryRecord> {
+ private final long offset;
+ private final int retryNum;
+ private final long retryTimeUTC;
+
+ public MessageRetryRecord(long offset) {
+ this(offset, 1);
+ }
+
+ private MessageRetryRecord(long offset, int retryNum) {
+ this.offset = offset;
+ this.retryNum = retryNum;
+ this.retryTimeUTC = System.currentTimeMillis() +
calculateRetryDelay();
+ }
+
+ /**
+ * Create a MessageRetryRecord for the next retry that should occur
after this one.
+ * @return MessageRetryRecord with the next retry time, or null to
indicate that another
+ * retry should not be performed. The latter case can happen
if we are about to
+ * run into the
backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm
+ * configuration.
+ */
+ public MessageRetryRecord createNextRetryRecord() {
+ return new MessageRetryRecord(this.offset, this.retryNum + 1);
+ }
+
+ private long calculateRetryDelay() {
+ double delayMultiplier = Math.pow(retryDelayMultiplier,
this.retryNum - 1);
+ long delayThisRetryMs = (long) (retryInitialDelayMs *
delayMultiplier);
+ return Math.min(delayThisRetryMs, retryDelayMaxMs);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return (other instanceof MessageRetryRecord
+ && this.offset == ((MessageRetryRecord) other).offset);
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.valueOf(this.offset).hashCode();
+ }
+
+ @Override
+ public int compareTo(MessageRetryRecord other) {
+ return Long.compare(this.retryTimeUTC, other.retryTimeUTC);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/02bffc60/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git
a/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
b/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
new file mode 100644
index 0000000..3f0e117
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+public interface FailedMsgRetryManager {
+ public void failed(Long offset);
+ public void acked(Long offset);
+ public void retryStarted(Long offset);
+ public Long nextFailedMessageToRetry();
+ public boolean shouldRetryMsg(Long offset);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/02bffc60/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index cff6df0..17dd72d 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -23,7 +23,6 @@ import backtype.storm.metric.api.CountMetric;
import backtype.storm.metric.api.MeanReducer;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.utils.Utils;
import com.google.common.collect.ImmutableMap;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
@@ -38,13 +37,6 @@ import java.util.*;
public class PartitionManager {
public static final Logger LOG =
LoggerFactory.getLogger(PartitionManager.class);
- private static final String TIMES_UP_MSG =
- "Retry logic in your topology is taking longer to complete than is
allowed by your"
- +" Storm Config setting TOPOLOGY_MESSAGE_TIMEOUT_SECS (%s
seconds). (i.e., you have"
- +" called OutputCollector.fail() too many times for this message).
KafkaSpout has"
- +" aborted next retry attempt (retry %s) for the Kafka message at
offset %s since it"
- +" would occur after this timeout.";
- private static final long TIMEOUT_RESET_VALUE = -1L;
private final CombinedMetric _fetchAPILatencyMax;
private final ReducedMetric _fetchAPILatencyMean;
@@ -53,10 +45,9 @@ public class PartitionManager {
Long _emittedToOffset;
// _pending key = Kafka offset, value = time at which the message was
first submitted to the topology
private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
- private SortedSet<Long> failed = new TreeSet<Long>();
+ private final FailedMsgRetryManager _failedMsgRetryManager;
// retryRecords key = Kafka offset, value = retry info for the given
message
- private Map<Long,MessageRetryRecord> retryRecords = new
HashMap<Long,MessageRetryRecord>();
Long _committedTo;
LinkedList<MessageAndRealOffset> _waitingToEmit = new
LinkedList<MessageAndRealOffset>();
Partition _partition;
@@ -77,6 +68,10 @@ public class PartitionManager {
_stormConf = stormConf;
numberAcked = numberFailed = 0;
+ _failedMsgRetryManager = new
ExponentialBackoffMsgRetryManager(_spoutConfig.retryInitialDelayMs,
+
_spoutConfig.retryDelayMultiplier,
+
_spoutConfig.retryDelayMaxMs);
+
String jsonTopologyId = null;
Long jsonOffset = null;
String path = committedPath();
@@ -156,31 +151,15 @@ public class PartitionManager {
}
}
- /**
- * Fetch the failed messages ready for retry. If there are no failed
messages, or none are ready for retry, then it
- * returns an empty List (i.e., not null).
- */
- private SortedSet<Long> failedMsgsReadyForRetry() {
- SortedSet<Long> ready = new TreeSet<Long>();
- for (Long offset : this.failed) {
- if (this.retryRecords.get(offset).isReadyForRetry()) {
- ready.add(offset);
- }
- }
- return ready;
- }
-
private void fill() {
long start = System.nanoTime();
- long offset;
- final SortedSet<Long> failedReady = failedMsgsReadyForRetry();
+ Long offset;
// Are there failed tuples? If so, fetch those first.
- final boolean had_failed = !failedReady.isEmpty();
- if (had_failed) {
- offset = failedReady.first();
- } else {
+ offset = this._failedMsgRetryManager.nextFailedMessageToRetry();
+ final boolean processingNewTuples = (offset == null);
+ if (processingNewTuples) {
offset = _emittedToOffset;
}
@@ -199,15 +178,15 @@ public class PartitionManager {
// Skip any old offsets.
continue;
}
- if (!had_failed || failedReady.contains(cur_offset)) {
+ if (processingNewTuples ||
this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
numMessages += 1;
if (!_pending.containsKey(cur_offset)) {
_pending.put(cur_offset, System.currentTimeMillis());
}
_waitingToEmit.add(new MessageAndRealOffset(msg.message(),
cur_offset));
_emittedToOffset = Math.max(msg.nextOffset(),
_emittedToOffset);
- if (had_failed) {
- failed.remove(cur_offset);
+ if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) {
+ this._failedMsgRetryManager.retryStarted(cur_offset);
}
}
}
@@ -221,7 +200,7 @@ public class PartitionManager {
_pending.headMap(offset - _spoutConfig.maxOffsetBehind).clear();
}
_pending.remove(offset);
- retryRecords.remove(offset);
+ this._failedMsgRetryManager.acked(offset);
numberAcked++;
}
@@ -239,18 +218,7 @@ public class PartitionManager {
throw new RuntimeException("Too many tuple failures");
}
- try {
- MessageRetryRecord retryRecord = retryRecords.get(offset);
- retryRecord = retryRecord == null
- ? new MessageRetryRecord(offset)
- : retryRecord.createNextRetryRecord();
-
- retryRecords.put(offset, retryRecord);
- failed.add(offset);
-
- } catch (MessageRetryRecord.AvailableRetryTimeExceededException e)
{
- LOG.error("cannot retry", e);
- }
+ this._failedMsgRetryManager.failed(offset);
}
}
@@ -304,90 +272,4 @@ public class PartitionManager {
this.offset = offset;
}
}
-
- /**
- * A MessageRetryRecord holds the data of how many times a message has
- * failed and been retried, and when the last failure occurred. It can
- * determine whether it is ready to be retried by employing an exponential
- * back-off calculation using config values stored in SpoutConfig:
- * <ul>
- * <li>retryInitialDelayMs - time to delay before the first retry</li>
- * <li>retryDelayMultiplier - multiplier by which to increase the delay
for each subsequent retry</li>
- * <li>retryDelayMaxMs - maximum retry delay (once this delay time is
reached, subsequent retries will
- * delay for this amount of time every time)
- * </li>
- * </ul>
- */
- class MessageRetryRecord {
- private final long offset;
- private final int retryNum;
- private final long retryTimeUTC;
-
- public MessageRetryRecord(long offset) throws
AvailableRetryTimeExceededException {
- this(offset, 1);
- }
-
- private MessageRetryRecord(long offset, int retryNum) throws
AvailableRetryTimeExceededException {
- this.offset = offset;
- this.retryNum = retryNum;
- this.retryTimeUTC = System.currentTimeMillis() +
calculateRetryDelay();
- validateRetryTime();
- }
-
- /**
- * Create a MessageRetryRecord for the next retry that should occur
after this one.
- * @return MessageRetryRecord with the next retry time, or null to
indicate that another
- * retry should not be performed. The latter case can happen
if we are about to
- * run into the
backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm
- * configuration.
- */
- public MessageRetryRecord createNextRetryRecord() throws
AvailableRetryTimeExceededException {
- return new MessageRetryRecord(this.offset, this.retryNum + 1);
- }
-
- private void validateRetryTime() throws
AvailableRetryTimeExceededException {
- long stormStartTime =
PartitionManager.this._pending.get(this.offset);
-
- if (stormStartTime == TIMEOUT_RESET_VALUE) {
- // This is a resubmission from the Storm framework after
Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
- // has elapsed. Restart my timer.
- PartitionManager.this._pending.put(this.offset,
System.currentTimeMillis());
-
- } else {
- int timeoutSeconds =
Utils.getInt(_stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
- if (this.retryTimeUTC - stormStartTime > timeoutSeconds *
1000) {
-
- // Prepare for when the Storm framework calls fail()
- _pending.put(this.offset, TIMEOUT_RESET_VALUE);
-
- throw new
AvailableRetryTimeExceededException(String.format(TIMES_UP_MSG,
-
timeoutSeconds,
-
this.retryNum,
-
this.offset));
-
- } else {
- LOG.warn(String.format("allowing another retry: start=%s,
retryTime=%s, timeoutSeconds=%s",
- (stormStartTime / 1000) % 1000,
- (this.retryTimeUTC / 1000) % 1000,
- timeoutSeconds));
- }
- }
- }
-
- private long calculateRetryDelay() {
- double delayMultiplier =
Math.pow(_spoutConfig.retryDelayMultiplier, this.retryNum - 1);
- long delayThisRetryMs = (long) (_spoutConfig.retryInitialDelayMs *
delayMultiplier);
- return Math.min(delayThisRetryMs, _spoutConfig.retryDelayMaxMs);
- }
-
- public boolean isReadyForRetry() {
- return System.currentTimeMillis() > this.retryTimeUTC;
- }
-
- class AvailableRetryTimeExceededException extends Exception {
- public AvailableRetryTimeExceededException(String msg) {
- super(msg);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/02bffc60/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
index 1c40ddf..61d0b35 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
@@ -32,9 +32,6 @@ public class SpoutConfig extends KafkaConfig implements
Serializable {
// Exponential back-off retry settings. These are used when retrying
messages after a bolt
// calls OutputCollector.fail().
- //
- // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS
appropriately to prevent
- // resubmitting the message while still retrying.
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
public long retryDelayMaxMs = 60 * 1000;
http://git-wip-us.apache.org/repos/asf/storm/blob/02bffc60/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
----------------------------------------------------------------------
diff --git
a/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
b/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
new file mode 100644
index 0000000..ef30163
--- /dev/null
+++
b/external/storm-kafka/src/test/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java
@@ -0,0 +1,194 @@
+package storm.kafka;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.junit.Test;
+
+public class ExponentialBackoffMsgRetryManagerTest {
+
+ private static final Long TEST_OFFSET = 101L;
+ private static final Long TEST_OFFSET2 = 102L;
+
+ @Test
+ public void testImmediateRetry() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new
ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ manager.failed(TEST_OFFSET);
+ Long next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry",
TEST_OFFSET, next);
+ assertTrue("message should be ready for retry immediately",
manager.shouldRetryMsg(TEST_OFFSET));
+
+ manager.retryStarted(TEST_OFFSET);
+
+ manager.failed(TEST_OFFSET);
+ next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry",
TEST_OFFSET, next);
+ assertTrue("message should be ready for retry immediately",
manager.shouldRetryMsg(TEST_OFFSET));
+ }
+
+ @Test
+ public void testSingleDelay() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new
ExponentialBackoffMsgRetryManager(10, 1d, 100);
+ manager.failed(TEST_OFFSET);
+ Thread.sleep(5);
+ Long next = manager.nextFailedMessageToRetry();
+ assertNull("expect no message ready for retry yet", next);
+ assertFalse("message should not be ready for retry yet",
manager.shouldRetryMsg(TEST_OFFSET));
+
+ Thread.sleep(10);
+ next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry",
TEST_OFFSET, next);
+ assertTrue("message should be ready for retry",
manager.shouldRetryMsg(TEST_OFFSET));
+ }
+
+ @Test
+ public void testExponentialBackoff() throws Exception {
+ final long initial = 10;
+ final double mult = 2d;
+ ExponentialBackoffMsgRetryManager manager = new
ExponentialBackoffMsgRetryManager(initial, mult, initial * 10);
+
+ long expectedWaitTime = initial;
+ for (long i = 0L; i < 3L; ++i) {
+ manager.failed(TEST_OFFSET);
+
+ Thread.sleep((expectedWaitTime + 1L) / 2L);
+ assertFalse("message should not be ready for retry yet",
manager.shouldRetryMsg(TEST_OFFSET));
+
+ Thread.sleep((expectedWaitTime + 1L) / 2L);
+ Long next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry",
TEST_OFFSET, next);
+ assertTrue("message should be ready for retry",
manager.shouldRetryMsg(TEST_OFFSET));
+
+ manager.retryStarted(TEST_OFFSET);
+ expectedWaitTime *= mult;
+ }
+ }
+
+ @Test
+ public void testRetryOrder() throws Exception {
+ final long initial = 10;
+ final double mult = 2d;
+ final long max = 20;
+ ExponentialBackoffMsgRetryManager manager = new
ExponentialBackoffMsgRetryManager(initial, mult, max);
+
+ manager.failed(TEST_OFFSET);
+ Thread.sleep(initial);
+
+ manager.retryStarted(TEST_OFFSET);
+ manager.failed(TEST_OFFSET);
+ manager.failed(TEST_OFFSET2);
+
+ // although TEST_OFFSET failed first, it's retry delay time is longer
b/c this is the second retry
+ // so TEST_OFFSET2 should come first
+
+ Thread.sleep(initial * 2);
+ assertTrue("message "+TEST_OFFSET+"should be ready for retry",
manager.shouldRetryMsg(TEST_OFFSET));
+ assertTrue("message "+TEST_OFFSET2+"should be ready for retry",
manager.shouldRetryMsg(TEST_OFFSET2));
+
+ Long next = manager.nextFailedMessageToRetry();
+ assertEquals("expect first message to retry is "+TEST_OFFSET2,
TEST_OFFSET2, next);
+
+ Thread.sleep(initial);
+
+ // haven't retried yet, so first should still be TEST_OFFSET2
+ next = manager.nextFailedMessageToRetry();
+ assertEquals("expect first message to retry is "+TEST_OFFSET2,
TEST_OFFSET2, next);
+ manager.retryStarted(next);
+
+ // now it should be TEST_OFFSET
+ next = manager.nextFailedMessageToRetry();
+ assertEquals("expect message to retry is now "+TEST_OFFSET,
TEST_OFFSET, next);
+ manager.retryStarted(next);
+
+ // now none left
+ next = manager.nextFailedMessageToRetry();
+ assertNull("expect no message to retry now", next);
+ }
+
+ @Test
+ public void testQueriesAfterRetriedAlready() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new
ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ manager.failed(TEST_OFFSET);
+ Long next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry",
TEST_OFFSET, next);
+ assertTrue("message should be ready for retry immediately",
manager.shouldRetryMsg(TEST_OFFSET));
+
+ manager.retryStarted(TEST_OFFSET);
+ next = manager.nextFailedMessageToRetry();
+ assertNull("expect no message ready after retried", next);
+ assertFalse("message should not be ready after retried",
manager.shouldRetryMsg(TEST_OFFSET));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testRetryWithoutFail() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new
ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ manager.retryStarted(TEST_OFFSET);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testFailRetryRetry() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new
ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ manager.failed(TEST_OFFSET);
+ try {
+ manager.retryStarted(TEST_OFFSET);
+ } catch (IllegalStateException ise) {
+ fail("IllegalStateException unexpected here: " + ise);
+ }
+
+ assertFalse("message should not be ready for retry",
manager.shouldRetryMsg(TEST_OFFSET));
+ manager.retryStarted(TEST_OFFSET);
+ }
+
+ @Test
+ public void testMaxBackoff() throws Exception {
+ final long initial = 10;
+ final double mult = 2d;
+ final long max = 20;
+ ExponentialBackoffMsgRetryManager manager = new
ExponentialBackoffMsgRetryManager(initial, mult, max);
+
+ long expectedWaitTime = initial;
+ for (long i = 0L; i < 4L; ++i) {
+ manager.failed(TEST_OFFSET);
+
+ Thread.sleep((expectedWaitTime + 1L) / 2L);
+ assertFalse("message should not be ready for retry yet",
manager.shouldRetryMsg(TEST_OFFSET));
+
+ Thread.sleep((expectedWaitTime + 1L) / 2L);
+ Long next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry",
TEST_OFFSET, next);
+ assertTrue("message should be ready for retry",
manager.shouldRetryMsg(TEST_OFFSET));
+
+ manager.retryStarted(TEST_OFFSET);
+ expectedWaitTime = Math.min((long) (expectedWaitTime * mult), max);
+ }
+ }
+
+ @Test
+ public void testFailThenAck() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new
ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ manager.failed(TEST_OFFSET);
+ assertTrue("message should be ready for retry",
manager.shouldRetryMsg(TEST_OFFSET));
+
+ manager.acked(TEST_OFFSET);
+
+ Long next = manager.nextFailedMessageToRetry();
+ assertNull("expect no message ready after acked", next);
+ assertFalse("message should not be ready after acked",
manager.shouldRetryMsg(TEST_OFFSET));
+ }
+
+ @Test
+ public void testAckThenFail() throws Exception {
+ ExponentialBackoffMsgRetryManager manager = new
ExponentialBackoffMsgRetryManager(0, 0d, 0);
+ manager.acked(TEST_OFFSET);
+ assertFalse("message should not be ready after acked",
manager.shouldRetryMsg(TEST_OFFSET));
+
+ manager.failed(TEST_OFFSET);
+
+ Long next = manager.nextFailedMessageToRetry();
+ assertEquals("expect test offset next available for retry",
TEST_OFFSET, next);
+ assertTrue("message should be ready for retry",
manager.shouldRetryMsg(TEST_OFFSET));
+ }
+}
\ No newline at end of file