move new offset calculation to PartitionManager; Don't update metrics on failed fetchå
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/67b5f56c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/67b5f56c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/67b5f56c Branch: refs/heads/security Commit: 67b5f56c1ff3905c88eb85dbb8985c7bb8342de9 Parents: 1897dee Author: P. Taylor Goetz <[email protected]> Authored: Thu Oct 23 16:27:07 2014 -0400 Committer: P. Taylor Goetz <[email protected]> Committed: Thu Oct 23 16:27:07 2014 -0400 ---------------------------------------------------------------------- external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 5 ++--- external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 5 ++++- .../storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java | 5 ----- 3 files changed, 6 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/67b5f56c/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java index eab80eb..918da74 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java @@ -180,11 +180,10 @@ public class KafkaUtils { if (fetchResponse.hasError()) { KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId)); if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) { - long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime); LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " + "retrying with default start offset time from configuration. " + - "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]"); - throw new UpdateOffsetException(startOffset); + "configured start offset time: [" + config.startOffsetTime + "]"); + throw new UpdateOffsetException(); } else { String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]"; LOG.error(message); http://git-wip-us.apache.org/repos/asf/storm/blob/67b5f56c/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index fa5f7e5..d24a49e 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -161,7 +161,10 @@ public class PartitionManager { try { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (UpdateOffsetException e) { - _emittedToOffset = e.startOffset; + _emittedToOffset = KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, _spoutConfig); + LOG.warn("Using new offset: {}", _emittedToOffset); + // fetch failed, so don't update the metrics + return; } long end = System.nanoTime(); long millis = (end - start) / 1000000; http://git-wip-us.apache.org/repos/asf/storm/blob/67b5f56c/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java index 69d8950..510c8cd 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java +++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java @@ -2,9 +2,4 @@ package storm.kafka; public class UpdateOffsetException extends RuntimeException { - public final Long startOffset; - - public UpdateOffsetException(Long _offset) { - this.startOffset = _offset; - } }
