stop if retries hit TOPOLOGY_MESSAGE_TIMEOUT_SECS print a loud error message with stack trace if the retries run up against the TOPOLOGY_MESSAGE_TIMEOUT_SECS timeout, and stop retrying
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/882dfc5f Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/882dfc5f Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/882dfc5f Branch: refs/heads/master Commit: 882dfc5fb7b3b26a17e4a1ee5f91220cd80fc4dd Parents: 3c699e3 Author: Rick Kilgore <[email protected]> Authored: Thu Oct 2 23:12:01 2014 -0700 Committer: Rick Kilgore <[email protected]> Committed: Thu Oct 2 23:12:01 2014 -0700 ---------------------------------------------------------------------- .../src/jvm/storm/kafka/PartitionManager.java | 96 +++++++++++++++++--- 1 file changed, 81 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/882dfc5f/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index 6b0930b..cff6df0 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -38,13 +38,24 @@ import java.util.*; public class PartitionManager { public static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class); + private static final String TIMES_UP_MSG = + "Retry logic in your topology is taking longer to complete than is allowed by your" + +" Storm Config setting TOPOLOGY_MESSAGE_TIMEOUT_SECS (%s seconds). (i.e., you have" + +" called OutputCollector.fail() too many times for this message). KafkaSpout has" + +" aborted next retry attempt (retry %s) for the Kafka message at offset %s since it" + +" would occur after this timeout."; + private static final long TIMEOUT_RESET_VALUE = -1L; + private final CombinedMetric _fetchAPILatencyMax; private final ReducedMetric _fetchAPILatencyMean; private final CountMetric _fetchAPICallCount; private final CountMetric _fetchAPIMessageCount; Long _emittedToOffset; - SortedSet<Long> _pending = new TreeSet<Long>(); + // _pending key = Kafka offset, value = time at which the message was first submitted to the topology + private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>(); private SortedSet<Long> failed = new TreeSet<Long>(); + + // retryRecords key = Kafka offset, value = retry info for the given message private Map<Long,MessageRetryRecord> retryRecords = new HashMap<Long,MessageRetryRecord>(); Long _committedTo; LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>(); @@ -190,7 +201,9 @@ public class PartitionManager { } if (!had_failed || failedReady.contains(cur_offset)) { numMessages += 1; - _pending.add(cur_offset); + if (!_pending.containsKey(cur_offset)) { + _pending.put(cur_offset, System.currentTimeMillis()); + } _waitingToEmit.add(new MessageAndRealOffset(msg.message(), cur_offset)); _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset); if (had_failed) { @@ -203,9 +216,9 @@ public class PartitionManager { } public void ack(Long offset) { - if (!_pending.isEmpty() && _pending.first() < offset - _spoutConfig.maxOffsetBehind) { + if (!_pending.isEmpty() && _pending.firstKey() < offset - _spoutConfig.maxOffsetBehind) { // Too many things pending! - _pending.headSet(offset - _spoutConfig.maxOffsetBehind).clear(); + _pending.headMap(offset - _spoutConfig.maxOffsetBehind).clear(); } _pending.remove(offset); retryRecords.remove(offset); @@ -221,15 +234,23 @@ public class PartitionManager { ); } else { LOG.debug("failing at offset=" + offset + " with _pending.size()=" + _pending.size() + " pending and _emittedToOffset=" + _emittedToOffset); - failed.add(offset); - MessageRetryRecord retryRecord = retryRecords.get(offset); - retryRecords.put(offset, retryRecord == null - ? new MessageRetryRecord() - : retryRecord.createNextRetryRecord()); numberFailed++; if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) { throw new RuntimeException("Too many tuple failures"); } + + try { + MessageRetryRecord retryRecord = retryRecords.get(offset); + retryRecord = retryRecord == null + ? new MessageRetryRecord(offset) + : retryRecord.createNextRetryRecord(); + + retryRecords.put(offset, retryRecord); + failed.add(offset); + + } catch (MessageRetryRecord.AvailableRetryTimeExceededException e) { + LOG.error("cannot retry", e); + } } } @@ -262,7 +283,7 @@ public class PartitionManager { if (_pending.isEmpty()) { return _emittedToOffset; } else { - return _pending.first(); + return _pending.firstKey(); } } @@ -298,20 +319,59 @@ public class PartitionManager { * </ul> */ class MessageRetryRecord { + private final long offset; private final int retryNum; private final long retryTimeUTC; - public MessageRetryRecord() { - this(1); + public MessageRetryRecord(long offset) throws AvailableRetryTimeExceededException { + this(offset, 1); } - private MessageRetryRecord(int retryNum) { + private MessageRetryRecord(long offset, int retryNum) throws AvailableRetryTimeExceededException { + this.offset = offset; this.retryNum = retryNum; this.retryTimeUTC = System.currentTimeMillis() + calculateRetryDelay(); + validateRetryTime(); + } + + /** + * Create a MessageRetryRecord for the next retry that should occur after this one. + * @return MessageRetryRecord with the next retry time, or null to indicate that another + * retry should not be performed. The latter case can happen if we are about to + * run into the backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm + * configuration. + */ + public MessageRetryRecord createNextRetryRecord() throws AvailableRetryTimeExceededException { + return new MessageRetryRecord(this.offset, this.retryNum + 1); } - public MessageRetryRecord createNextRetryRecord() { - return new MessageRetryRecord(this.retryNum + 1); + private void validateRetryTime() throws AvailableRetryTimeExceededException { + long stormStartTime = PartitionManager.this._pending.get(this.offset); + + if (stormStartTime == TIMEOUT_RESET_VALUE) { + // This is a resubmission from the Storm framework after Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS + // has elapsed. Restart my timer. + PartitionManager.this._pending.put(this.offset, System.currentTimeMillis()); + + } else { + int timeoutSeconds = Utils.getInt(_stormConf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)); + if (this.retryTimeUTC - stormStartTime > timeoutSeconds * 1000) { + + // Prepare for when the Storm framework calls fail() + _pending.put(this.offset, TIMEOUT_RESET_VALUE); + + throw new AvailableRetryTimeExceededException(String.format(TIMES_UP_MSG, + timeoutSeconds, + this.retryNum, + this.offset)); + + } else { + LOG.warn(String.format("allowing another retry: start=%s, retryTime=%s, timeoutSeconds=%s", + (stormStartTime / 1000) % 1000, + (this.retryTimeUTC / 1000) % 1000, + timeoutSeconds)); + } + } } private long calculateRetryDelay() { @@ -323,5 +383,11 @@ public class PartitionManager { public boolean isReadyForRetry() { return System.currentTimeMillis() > this.retryTimeUTC; } + + class AvailableRetryTimeExceededException extends Exception { + public AvailableRetryTimeExceededException(String msg) { + super(msg); + } + } } }
