Github user harshach commented on a diff in the pull request:
https://github.com/apache/storm/pull/1586#discussion_r72279426
--- Diff:
external/storm-kinesis/src/main/java/org/apache/storm/kinesis/spout/ExponentialBackoffRetrier.java
---
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kinesis.spout;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+public class ExponentialBackoffRetrier implements
FailedMessageRetryHandler, Serializable {
+ private static final Logger LOG =
LoggerFactory.getLogger(ExponentialBackoffRetrier.class);
+ // Wait interfal for retrying after first failure
+ private final Long initialDelayMillis;
+ // Base for exponential function in seconds for retrying for second,
third and so on failures
+ private final Long baseSeconds;
+ // Maximum number of retries
+ private final Long maxRetries;
+ // map to track number of failures for each kinesis message that failed
+ private Map<KinesisMessageId, Long> failCounts = new HashMap<>();
+ // map to track next retry time for each kinesis message that failed
+ private Map<KinesisMessageId, Long> retryTimes = new HashMap<>();
+ // sorted set of records to be retrued based on retry time. earliest
retryTime record comes first
+ private SortedSet<KinesisMessageId> retryMessageSet = new
TreeSet<>(new RetryTimeComparator());
+
+ /**
+ * no args constructor that uses defaults of 100 ms for first retry,
max retries of Long.MAX_VALUE and an exponential backoff of Math.pow(2,i-1)
secs for
+ * retry i where i = 2,3,
+ */
+ public ExponentialBackoffRetrier () {
+ this(100L, 2L, Long.MAX_VALUE);
+ }
+
+ /**
+ *
+ * @param initialDelayMillis delay in milliseconds for first retry
+ * @param baseSeconds base for exponent function in seconds
+ * @param maxRetries maximum number of retries before the record is
discarded/acked
+ */
+ public ExponentialBackoffRetrier (Long initialDelayMillis, Long
baseSeconds, Long maxRetries) {
+ this.initialDelayMillis = initialDelayMillis;
+ this.baseSeconds = baseSeconds;
+ this.maxRetries = maxRetries;
+ validate();
+ }
+
+ private void validate () {
+ if (initialDelayMillis < 0) {
+ throw new IllegalArgumentException("initialDelayMillis cannot
be negative." );
+ }
+ if (baseSeconds < 0) {
+ throw new IllegalArgumentException("baseSeconds cannot be
negative.");
+ }
+ if (maxRetries < 0) {
+ throw new IllegalArgumentException("maxRetries cannot be
negative.");
+ }
+ }
+ @Override
+ public boolean failed(KinesisMessageId messageId) {
+ LOG.debug("Handling failed message " + messageId);
+ // if maxRetries is 0, dont retry and return false as per
interface contract
+ if (maxRetries == 0) {
+ LOG.debug("maxRetries set to 0. Hence not queueing " +
messageId);
+ return false;
+ }
+ // if first failure add it to the count map
+ if (!failCounts.containsKey(messageId)) {
+ failCounts.put(messageId, 0L);
+ }
+ // increment the fail count as we started with 0
+ Long failCount = failCounts.get(messageId);
+ failCounts.put(messageId, ++failCount);
+ // if fail count is greater than maxRetries, discard or ack. for
e.g. for maxRetries 3, 4 failures are allowed at maximum
+ if (failCount > maxRetries) {
+ LOG.debug("maxRetries reached so dropping " + messageId);
+ failCounts.remove(messageId);
+ return false;
+ }
+ // if reached so far, add it to the set of messages waiting to be
retried with next retry time based on how many times it failed
+ retryTimes.put(messageId, getRetryTime(failCount));
+ retryMessageSet.add(messageId);
+ LOG.debug("Scheduled " + messageId + " for retry at " +
retryTimes.get(messageId) + " and retry attempt " + failCount);
--- End diff --
I recommend this to be at warn level as well.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---