STORM-1705: Cap number of retries for a failed message Signed-off-by: P. Taylor Goetz <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fd51eacb Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fd51eacb Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fd51eacb Branch: refs/heads/master Commit: fd51eacb58c52d7ed35ecbf52b7e6a9509fe1a7f Parents: 0b54767 Author: Abhishek Agarwal <[email protected]> Authored: Tue Apr 12 15:28:06 2016 +0530 Committer: P. Taylor Goetz <[email protected]> Committed: Fri Jun 10 10:36:15 2016 -0400 ---------------------------------------------------------------------- .../ExponentialBackoffMsgRetryManager.java | 38 +++++--- .../storm/kafka/FailedMsgRetryManager.java | 59 ++++++++++-- .../apache/storm/kafka/PartitionManager.java | 32 +++++-- .../jvm/org/apache/storm/kafka/SpoutConfig.java | 2 + .../ExponentialBackoffMsgRetryManagerTest.java | 99 +++++++++++++------- 5 files changed, 171 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java index f86d624..7b5f5dd 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java @@ -27,17 +27,25 @@ import java.util.concurrent.ConcurrentHashMap; public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager { - private final long retryInitialDelayMs; - private final double retryDelayMultiplier; - private final long retryDelayMaxMs; + private long retryInitialDelayMs; + private double retryDelayMultiplier; + private long retryDelayMaxMs; + private int retryLimit; - private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator()); - private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>(); + private Queue<MessageRetryRecord> waiting; + private Map<Long,MessageRetryRecord> records; - public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) { - this.retryInitialDelayMs = retryInitialDelayMs; - this.retryDelayMultiplier = retryDelayMultiplier; - this.retryDelayMaxMs = retryDelayMaxMs; + public ExponentialBackoffMsgRetryManager() { + + } + + public void prepare(SpoutConfig spoutConfig) { + this.retryInitialDelayMs = spoutConfig.retryInitialDelayMs; + this.retryDelayMultiplier = spoutConfig.retryDelayMultiplier; + this.retryDelayMaxMs = spoutConfig.retryDelayMaxMs; + this.retryLimit = spoutConfig.retryLimit; + this.waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator()); + this.records = new ConcurrentHashMap<Long,MessageRetryRecord>(); } @Override @@ -86,7 +94,7 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager } @Override - public boolean shouldRetryMsg(Long offset) { + public boolean shouldReEmitMsg(Long offset) { MessageRetryRecord record = this.records.get(offset); return record != null && this.waiting.contains(record) && @@ -94,7 +102,15 @@ public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager } @Override - public Set<Long> clearInvalidMessages(Long kafkaOffset) { + public boolean retryFurther(Long offset) { + MessageRetryRecord record = this.records.get(offset); + return ! (record != null && + this.waiting.contains(record) && + this.retryLimit <= record.retryNum); + } + + @Override + public Set<Long> clearOffsetsBefore(Long kafkaOffset) { Set<Long> invalidOffsets = new HashSet<Long>(); for(Long offset : records.keySet()){ if(offset < kafkaOffset){ http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java index e9a7092..9a3b19f 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java @@ -17,13 +17,58 @@ */ package org.apache.storm.kafka; +import java.io.Serializable; import java.util.Set; -public interface FailedMsgRetryManager { - public void failed(Long offset); - public void acked(Long offset); - public void retryStarted(Long offset); - public Long nextFailedMessageToRetry(); - public boolean shouldRetryMsg(Long offset); - public Set<Long> clearInvalidMessages(Long kafkaOffset); +public interface FailedMsgRetryManager extends Serializable { + + /** + * Initialization + */ + void prepare(SpoutConfig spoutConfig); + + /** + * Message corresponding to the offset failed in kafka spout. + * @param offset + */ + void failed(Long offset); + + /** + * Message corresponding to the offset, was acked to kafka spout. + * @param offset + */ + void acked(Long offset); + + /** + * Message corresponding to the offset, has been re-emitted and under transit. + * @param offset + */ + void retryStarted(Long offset); + + /** + * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset + * and resend them, except completed messages. + * @return + */ + Long nextFailedMessageToRetry(); + + /** + * @param offset + * @return True if the message corresponding to the offset should be emitted NOW. False otherwise. + */ + boolean shouldReEmitMsg(Long offset); + + /** + * Spout will clean up the state for this offset if false is returned. + * @param offset + * @return True if the message will be retried again. False otherwise. + */ + boolean retryFurther(Long offset); + + /** + * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka. + * @param kafkaOffset + * @return Set of offsets removed. + */ + Set<Long> clearOffsetsBefore(Long kafkaOffset); } http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index f8d2c41..f42b8f2 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -47,6 +47,9 @@ public class PartitionManager { private final CountMetric _fetchAPIMessageCount; // Count of messages which could not be emitted or retried because they were deleted from kafka private final CountMetric _lostMessageCount; + // Count of messages which were not retried because failedMsgRetryManager didn't consider offset eligible for + // retry + private final CountMetric _messageIneligibleForRetryCount; Long _emittedToOffset; // _pending key = Kafka offset, value = time at which the message was first submitted to the topology private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>(); @@ -73,9 +76,14 @@ public class PartitionManager { _stormConf = stormConf; numberAcked = numberFailed = 0; - _failedMsgRetryManager = new ExponentialBackoffMsgRetryManager(_spoutConfig.retryInitialDelayMs, - _spoutConfig.retryDelayMultiplier, - _spoutConfig.retryDelayMaxMs); + try { + _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance(); + _failedMsgRetryManager.prepare(spoutConfig); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>", + FailedMsgRetryManager.class, + spoutConfig.failedMsgRetryManagerClass), e); + } String jsonTopologyId = null; Long jsonOffset = null; @@ -121,6 +129,7 @@ public class PartitionManager { _fetchAPICallCount = new CountMetric(); _fetchAPIMessageCount = new CountMetric(); _lostMessageCount = new CountMetric(); + _messageIneligibleForRetryCount = new CountMetric(); } public Map getMetricsDataMap() { @@ -130,6 +139,7 @@ public class PartitionManager { ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset()); ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset()); ret.put(_partition + "/lostMessageCount", _lostMessageCount.getValueAndReset()); + ret.put(_partition + "/messageIneligibleForRetryCount", _messageIneligibleForRetryCount.getValueAndReset()); return ret; } @@ -198,7 +208,7 @@ public class PartitionManager { // all the failed offsets, that are earlier than actual EarliestTime // offset, since they are anyway not there. // These calls to broker API will be then saved. - Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(offset); + Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset); // Omitted messages have not been acked and may be lost if (null != omitted) { @@ -229,14 +239,14 @@ public class PartitionManager { // Skip any old offsets. continue; } - if (processingNewTuples || this._failedMsgRetryManager.shouldRetryMsg(cur_offset)) { + if (processingNewTuples || this._failedMsgRetryManager.shouldReEmitMsg(cur_offset)) { numMessages += 1; if (!_pending.containsKey(cur_offset)) { _pending.put(cur_offset, System.currentTimeMillis()); } _waitingToEmit.add(msg); _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset); - if (_failedMsgRetryManager.shouldRetryMsg(cur_offset)) { + if (_failedMsgRetryManager.shouldReEmitMsg(cur_offset)) { this._failedMsgRetryManager.retryStarted(cur_offset); } } @@ -273,7 +283,15 @@ public class PartitionManager { throw new RuntimeException("Too many tuple failures"); } - this._failedMsgRetryManager.failed(offset); + // Offset may not be considered for retry by failedMsgRetryManager + if (this._failedMsgRetryManager.retryFurther(offset)) { + this._failedMsgRetryManager.failed(offset); + } else { + // state for the offset should be cleaned up + _messageIneligibleForRetryCount.incr(); + _pending.remove(offset); + this._failedMsgRetryManager.acked(offset); + } } } http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java index 1ac41c8..415ce0a 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java @@ -37,6 +37,8 @@ public class SpoutConfig extends KafkaConfig implements Serializable { public long retryInitialDelayMs = 0; public double retryDelayMultiplier = 1.0; public long retryDelayMaxMs = 60 * 1000; + public int retryLimit = Integer.MAX_VALUE; + public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName(); public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic); http://git-wip-us.apache.org/repos/asf/storm/blob/fd51eacb/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java index 8fa6564..f2815e2 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ExponentialBackoffMsgRetryManagerTest.java @@ -17,14 +17,14 @@ */ package org.apache.storm.kafka; +import org.junit.Test; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import org.junit.Test; - public class ExponentialBackoffMsgRetryManagerTest { private static final Long TEST_OFFSET = 101L; @@ -34,52 +34,54 @@ public class ExponentialBackoffMsgRetryManagerTest { @Test public void testImmediateRetry() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + + + ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE); manager.failed(TEST_OFFSET); Long next = manager.nextFailedMessageToRetry(); assertEquals("expect test offset next available for retry", TEST_OFFSET, next); - assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET)); + assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET)); manager.retryStarted(TEST_OFFSET); manager.failed(TEST_OFFSET); next = manager.nextFailedMessageToRetry(); assertEquals("expect test offset next available for retry", TEST_OFFSET, next); - assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET)); + assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET)); } @Test public void testSingleDelay() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(100, 1d, 1000); + ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(100, 1d, 1000, Integer.MAX_VALUE); manager.failed(TEST_OFFSET); Thread.sleep(5); Long next = manager.nextFailedMessageToRetry(); assertNull("expect no message ready for retry yet", next); - assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET)); + assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET)); Thread.sleep(100); next = manager.nextFailedMessageToRetry(); assertEquals("expect test offset next available for retry", TEST_OFFSET, next); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET)); } @Test public void testExponentialBackoff() throws Exception { final long initial = 10; final double mult = 2d; - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, initial * 10); + ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, initial * 10, Integer.MAX_VALUE); long expectedWaitTime = initial; for (long i = 0L; i < 3L; ++i) { manager.failed(TEST_OFFSET); Thread.sleep((expectedWaitTime + 1L) / 2L); - assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET)); + assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET)); Thread.sleep((expectedWaitTime + 1L) / 2L); Long next = manager.nextFailedMessageToRetry(); assertEquals("expect test offset next available for retry", TEST_OFFSET, next); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET)); manager.retryStarted(TEST_OFFSET); expectedWaitTime *= mult; @@ -91,7 +93,7 @@ public class ExponentialBackoffMsgRetryManagerTest { final long initial = 10; final double mult = 2d; final long max = 20; - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max); + ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, Integer.MAX_VALUE); manager.failed(TEST_OFFSET); Thread.sleep(initial); @@ -104,8 +106,8 @@ public class ExponentialBackoffMsgRetryManagerTest { // so TEST_OFFSET2 should come first Thread.sleep(initial * 2); - assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); - assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2)); + assertTrue("message "+TEST_OFFSET+"should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET)); + assertTrue("message "+TEST_OFFSET2+"should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2)); Long next = manager.nextFailedMessageToRetry(); assertEquals("expect first message to retry is "+TEST_OFFSET2, TEST_OFFSET2, next); @@ -129,27 +131,27 @@ public class ExponentialBackoffMsgRetryManagerTest { @Test public void testQueriesAfterRetriedAlready() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE); manager.failed(TEST_OFFSET); Long next = manager.nextFailedMessageToRetry(); assertEquals("expect test offset next available for retry", TEST_OFFSET, next); - assertTrue("message should be ready for retry immediately", manager.shouldRetryMsg(TEST_OFFSET)); + assertTrue("message should be ready for retry immediately", manager.shouldReEmitMsg(TEST_OFFSET)); manager.retryStarted(TEST_OFFSET); next = manager.nextFailedMessageToRetry(); assertNull("expect no message ready after retried", next); - assertFalse("message should not be ready after retried", manager.shouldRetryMsg(TEST_OFFSET)); + assertFalse("message should not be ready after retried", manager.shouldReEmitMsg(TEST_OFFSET)); } @Test(expected = IllegalStateException.class) public void testRetryWithoutFail() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE); manager.retryStarted(TEST_OFFSET); } @Test(expected = IllegalStateException.class) public void testFailRetryRetry() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE); manager.failed(TEST_OFFSET); try { manager.retryStarted(TEST_OFFSET); @@ -157,7 +159,7 @@ public class ExponentialBackoffMsgRetryManagerTest { fail("IllegalStateException unexpected here: " + ise); } - assertFalse("message should not be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + assertFalse("message should not be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET)); manager.retryStarted(TEST_OFFSET); } @@ -166,19 +168,19 @@ public class ExponentialBackoffMsgRetryManagerTest { final long initial = 100; final double mult = 2d; final long max = 2000; - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(initial, mult, max); + ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, Integer.MAX_VALUE); long expectedWaitTime = initial; for (long i = 0L; i < 4L; ++i) { manager.failed(TEST_OFFSET); Thread.sleep((expectedWaitTime + 1L) / 2L); - assertFalse("message should not be ready for retry yet", manager.shouldRetryMsg(TEST_OFFSET)); + assertFalse("message should not be ready for retry yet", manager.shouldReEmitMsg(TEST_OFFSET)); Thread.sleep((expectedWaitTime + 1L) / 2L); Long next = manager.nextFailedMessageToRetry(); assertEquals("expect test offset next available for retry", TEST_OFFSET, next); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET)); manager.retryStarted(TEST_OFFSET); expectedWaitTime = Math.min((long) (expectedWaitTime * mult), max); @@ -187,42 +189,42 @@ public class ExponentialBackoffMsgRetryManagerTest { @Test public void testFailThenAck() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE); manager.failed(TEST_OFFSET); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET)); manager.acked(TEST_OFFSET); Long next = manager.nextFailedMessageToRetry(); assertNull("expect no message ready after acked", next); - assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET)); + assertFalse("message should not be ready after acked", manager.shouldReEmitMsg(TEST_OFFSET)); } @Test public void testAckThenFail() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE); manager.acked(TEST_OFFSET); - assertFalse("message should not be ready after acked", manager.shouldRetryMsg(TEST_OFFSET)); + assertFalse("message should not be ready after acked", manager.shouldReEmitMsg(TEST_OFFSET)); manager.failed(TEST_OFFSET); Long next = manager.nextFailedMessageToRetry(); assertEquals("expect test offset next available for retry", TEST_OFFSET, next); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); + assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET)); } @Test public void testClearInvalidMessages() throws Exception { - ExponentialBackoffMsgRetryManager manager = new ExponentialBackoffMsgRetryManager(0, 0d, 0); + ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(0, 0d, 0, Integer.MAX_VALUE); manager.failed(TEST_OFFSET); manager.failed(TEST_OFFSET2); manager.failed(TEST_OFFSET3); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET)); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET2)); - assertTrue("message should be ready for retry", manager.shouldRetryMsg(TEST_OFFSET3)); + assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET)); + assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET2)); + assertTrue("message should be ready for retry", manager.shouldReEmitMsg(TEST_OFFSET3)); - manager.clearInvalidMessages(TEST_NEW_OFFSET); + manager.clearOffsetsBefore(TEST_NEW_OFFSET); Long next = manager.nextFailedMessageToRetry(); assertEquals("expect test offset next available for retry", TEST_OFFSET3, next); @@ -232,4 +234,33 @@ public class ExponentialBackoffMsgRetryManagerTest { assertNull("expect no message ready after acked", next); } + @Test + public void testMaxRetry() throws Exception { + final long initial = 100; + final double mult = 2d; + final long max = 2000; + final int maxRetries = 2; + ExponentialBackoffMsgRetryManager manager = buildExponentialBackoffMsgRetryManager(initial, mult, max, maxRetries); + assertTrue(manager.retryFurther(TEST_OFFSET)); + manager.failed(TEST_OFFSET); + + assertTrue(manager.retryFurther(TEST_OFFSET)); + manager.failed(TEST_OFFSET); + + assertFalse(manager.retryFurther(TEST_OFFSET)); + } + + private ExponentialBackoffMsgRetryManager buildExponentialBackoffMsgRetryManager(long retryInitialDelayMs, + double retryDelayMultiplier, + long retryDelayMaxMs, + int retryLimit) { + SpoutConfig spoutConfig = new SpoutConfig(null, null, null, null); + spoutConfig.retryInitialDelayMs = retryInitialDelayMs; + spoutConfig.retryDelayMultiplier = retryDelayMultiplier; + spoutConfig.retryDelayMaxMs = retryDelayMaxMs; + spoutConfig.retryLimit = retryLimit; + ExponentialBackoffMsgRetryManager exponentialBackoffMsgRetryManager = new ExponentialBackoffMsgRetryManager(); + exponentialBackoffMsgRetryManager.prepare(spoutConfig); + return exponentialBackoffMsgRetryManager; + } }
