do the time calc only once do the due time calculation for a MessageRetryRecord when one is created, not every time isReadyForRetry() is calculated
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2aa03468 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2aa03468 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2aa03468 Branch: refs/heads/master Commit: 2aa03468db3971a11d07048628908074f2a0d3b3 Parents: 79e3efe Author: Rick Kilgore <[email protected]> Authored: Tue Sep 9 16:03:34 2014 -0700 Committer: Rick Kilgore <[email protected]> Committed: Tue Sep 9 16:03:34 2014 -0700 ---------------------------------------------------------------------- .../src/jvm/storm/kafka/PartitionManager.java | 33 +++++++++++--------- 1 file changed, 19 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/2aa03468/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index 02edf31..c66545e 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -223,7 +223,9 @@ public class PartitionManager { LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset); failed.add(offset); MessageRetryRecord retryRecord = retryRecords.get(offset); - retryRecords.put(offset, retryRecord == null ? new MessageRetryRecord() : retryRecord.retryAgainRecord()); + retryRecords.put(offset, retryRecord == null + ? new MessageRetryRecord() + : retryRecord.createNextRetryRecord()); numberFailed++; if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) { throw new RuntimeException("Too many tuple failures"); @@ -296,27 +298,30 @@ public class PartitionManager { * </ul> */ class MessageRetryRecord { - private final long failTimeUTC; - private final int attemptsAlreadyPerformed; - - private MessageRetryRecord(int attemptsAlreadyPerformed) { - this.failTimeUTC = new Date().getTime(); - this.attemptsAlreadyPerformed = attemptsAlreadyPerformed; - } + private final int retryNum; + private final long retryTimeUTC; public MessageRetryRecord() { this(1); } - public MessageRetryRecord retryAgainRecord() { - return new MessageRetryRecord(this.attemptsAlreadyPerformed + 1); + private MessageRetryRecord(int retryNum) { + this.retryNum = retryNum; + this.retryTimeUTC = new Date().getTime() + calculateRetryDelay(this.retryNum); } - public boolean isReadyForRetry() { - double delayMultiplier = Math.pow(_spoutConfig.retryDelayMultiplier, this.attemptsAlreadyPerformed - 1); + public MessageRetryRecord createNextRetryRecord() { + return new MessageRetryRecord(this.retryNum + 1); + } + + private long calculateRetryDelay(int retryNum) { + double delayMultiplier = Math.pow(_spoutConfig.retryDelayMultiplier, retryNum - 1); long delayThisRetryMs = (long) (_spoutConfig.retryInitialDelayMs * delayMultiplier); - delayThisRetryMs = Math.min(delayThisRetryMs, _spoutConfig.retryDelayMaxMs); - return new Date().getTime() - this.failTimeUTC > delayThisRetryMs; + return Math.min(delayThisRetryMs, _spoutConfig.retryDelayMaxMs); + } + + public boolean isReadyForRetry() { + return new Date().getTime() > this.retryTimeUTC; } } }
