[
https://issues.apache.org/jira/browse/STORM-495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14313348#comment-14313348
]
ASF GitHub Bot commented on STORM-495:
--------------------------------------
Github user kenshih commented on a diff in the pull request:
https://github.com/apache/storm/pull/254#discussion_r24383036
--- Diff:
external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
---
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package storm.kafka;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
+public class ExponentialBackoffMsgRetryManager implements
FailedMsgRetryManager {
+
+ private final long retryInitialDelayMs;
+ private final double retryDelayMultiplier;
+ private final long retryDelayMaxMs;
+
+ private Queue<MessageRetryRecord> waiting = new
PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
+ private Map<Long,MessageRetryRecord> records = new
HashMap<Long,MessageRetryRecord>();
+
+ public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs,
double retryDelayMultiplier, long retryDelayMaxMs) {
+ this.retryInitialDelayMs = retryInitialDelayMs;
+ this.retryDelayMultiplier = retryDelayMultiplier;
+ this.retryDelayMaxMs = retryDelayMaxMs;
+ }
+
+ @Override
+ public void failed(Long offset) {
+ MessageRetryRecord oldRecord = this.records.get(offset);
+ MessageRetryRecord newRecord = oldRecord == null ?
+ new MessageRetryRecord(offset) :
+ oldRecord.createNextRetryRecord();
+ this.records.put(offset, newRecord);
+ this.waiting.add(newRecord);
+ }
+
+ @Override
+ public void acked(Long offset) {
+ MessageRetryRecord record = this.records.remove(offset);
+ if (record != null) {
+ this.waiting.remove(record);
+ }
+ }
+
+ @Override
+ public void retryStarted(Long offset) {
+ MessageRetryRecord record = this.records.get(offset);
+ if (record == null || !this.waiting.contains(record)) {
+ throw new IllegalStateException("cannot retry a message that
has not failed");
+ } else {
+ this.waiting.remove(record);
+ }
+ }
+
+ @Override
+ public Long nextFailedMessageToRetry() {
+ if (this.waiting.size() > 0) {
+ MessageRetryRecord first = this.waiting.peek();
+ if (System.currentTimeMillis() >= first.retryTimeUTC) {
+ if (this.records.containsKey(first.offset)) {
+ return first.offset;
+ } else {
+ // defensive programming - should be impossible
+ this.waiting.remove(first);
+ return nextFailedMessageToRetry();
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean shouldRetryMsg(Long offset) {
+ MessageRetryRecord record = this.records.get(offset);
+ return record != null &&
+ this.waiting.contains(record) &&
+ System.currentTimeMillis() >= record.retryTimeUTC;
+ }
+
+ /**
+ * A MessageRetryRecord holds the data of how many times a message has
+ * failed and been retried, and when the last failure occurred. It can
+ * determine whether it is ready to be retried by employing an
exponential
+ * back-off calculation using config values stored in SpoutConfig:
+ * <ul>
+ * <li>retryInitialDelayMs - time to delay before the first retry</li>
+ * <li>retryDelayMultiplier - multiplier by which to increase the
delay for each subsequent retry</li>
+ * <li>retryDelayMaxMs - maximum retry delay (once this delay time is
reached, subsequent retries will
+ * delay for this amount of time every time)
+ * </li>
+ * </ul>
+ */
+ class MessageRetryRecord {
+ private final long offset;
+ private final int retryNum;
+ private final long retryTimeUTC;
+
+ public MessageRetryRecord(long offset) {
+ this(offset, 1);
+ }
+
+ private MessageRetryRecord(long offset, int retryNum) {
+ this.offset = offset;
+ this.retryNum = retryNum;
+ this.retryTimeUTC = System.currentTimeMillis() +
calculateRetryDelay();
+ }
+
+ /**
+ * Create a MessageRetryRecord for the next retry that should
occur after this one.
+ * @return MessageRetryRecord with the next retry time, or null to
indicate that another
+ * retry should not be performed. The latter case can
happen if we are about to
+ * run into the
backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm
+ * configuration.
+ */
+ public MessageRetryRecord createNextRetryRecord() {
+ return new MessageRetryRecord(this.offset, this.retryNum + 1);
+ }
+
+ private long calculateRetryDelay() {
+ double delayMultiplier = Math.pow(retryDelayMultiplier,
this.retryNum - 1);
+ double delay = retryInitialDelayMs * delayMultiplier;
+ Long maxLong = Long.MAX_VALUE;
+ long delayThisRetryMs = delay >= maxLong.doubleValue()
+ ? maxLong
+ : (long) delay;
+ return Math.min(delayThisRetryMs, retryDelayMaxMs);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return (other instanceof MessageRetryRecord
+ && this.offset == ((MessageRetryRecord) other).offset);
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.valueOf(this.offset).hashCode();
+ }
+ }
+
+ class RetryTimeComparator implements Comparator<MessageRetryRecord> {
+
+ @Override
+ public int compare(MessageRetryRecord record1, MessageRetryRecord
record2) {
+ return Long.compare(record1.retryTimeUTC,
record2.retryTimeUTC);
--- End diff --
note, this Long.compare(long, long) signature was added in java7, so
project pom should probably update source/target to "1.7" instead of current
value of "1.6"
> Add delayed retries to KafkaSpout
> ---------------------------------
>
> Key: STORM-495
> URL: https://issues.apache.org/jira/browse/STORM-495
> Project: Apache Storm
> Issue Type: Improvement
> Affects Versions: 0.9.3
> Environment: all environments
> Reporter: Rick Kilgore
> Assignee: Rick Kilgore
> Priority: Minor
> Labels: kafka, retry
> Fix For: 0.10.0
>
>
> If a tuple in the topology originates from the KafkaSpout from the
> external/storm-kafka sources, and if a bolt in the topology indicates a
> failure by calling fail() on its OutputCollector, the KafkaSpout will
> immediately retry the message.
> We wish to use this failure and retry behavior in our ingestion system
> whenever we experience a recoverable error from a downstream system, such as
> a 500 or 503 error from a service we depend on. But with the current
> KafkaSpout behavior, doing so results in a tight loop where we retry several
> times over a few seconds and then give up. I want to be able to delay retry
> to give the downstream service some time to recover. Ideally, I would like
> to have configurable, exponential backoff retry.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)