Github user rick-kilgore commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/254#discussion_r17395251
  
    --- Diff: external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---
    @@ -261,4 +283,45 @@ public KafkaMessageId(Partition partition, long 
offset) {
                 this.offset = offset;
             }
         }
    +
    +    /**
    +     * A MessageRetryRecord holds the data of how many times a message has
    +     * failed and been retried, and when the last failure occurred.  It can
    +     * determine whether it is ready to be retried by employing an 
exponential
    +     * back-off calculation using config values stored in SpoutConfig:
    +     * <ul>
    +     *  <li>retryInitialDelayMs - time to delay before the first retry</li>
    +     *  <li>retryDelayMultiplier - multiplier by which to increase the 
delay for each subsequent retry</li>
    +     *  <li>retryDelayMaxMs - maximum retry delay (once this delay time is 
reached, subsequent retries will
    +     *                        delay for this amount of time every time)
    +     *  </li>
    +     * </ul>
    +     */
    +    class MessageRetryRecord {
    +        private final int retryNum;
    +        private final long retryTimeUTC;
    +
    +        public MessageRetryRecord() {
    +            this(1);
    +        }
    +
    +        private MessageRetryRecord(int retryNum) {
    +            this.retryNum = retryNum;
    +            this.retryTimeUTC = new Date().getTime() + 
calculateRetryDelay();
    +        }
    +
    +        public MessageRetryRecord createNextRetryRecord() {
    +            return new MessageRetryRecord(this.retryNum + 1);
    +        }
    +
    +        private long calculateRetryDelay() {
    +            double delayMultiplier = 
Math.pow(_spoutConfig.retryDelayMultiplier, this.retryNum - 1);
    +            long delayThisRetryMs = (long) 
(_spoutConfig.retryInitialDelayMs * delayMultiplier);
    +            return Math.min(delayThisRetryMs, 
_spoutConfig.retryDelayMaxMs);
    +        }
    +
    +        public boolean isReadyForRetry() {
    +            return new Date().getTime() > this.retryTimeUTC;
    --- End diff --
    
    done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to