Revert "STORM-616 : removing unintended changes." This reverts commit d260759ac203383e27668a7cb7090926029f7406.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ca235e6c Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ca235e6c Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ca235e6c Branch: refs/heads/master Commit: ca235e6cb18006bbbac56361639309e73c196718 Parents: 079deda Author: Parth Brahmbhatt <[email protected]> Authored: Tue Jan 6 17:43:58 2015 -0500 Committer: Parth Brahmbhatt <[email protected]> Committed: Tue Jan 6 17:43:58 2015 -0500 ---------------------------------------------------------------------- external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 7 ++++--- .../src/jvm/storm/kafka/UpdateOffsetException.java | 5 ++++- .../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java | 10 +++++++++- 3 files changed, 17 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java index 918da74..3165189 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java @@ -180,10 +180,11 @@ public class KafkaUtils { if (fetchResponse.hasError()) { KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId)); if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) { - LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " + + String msg = "Got fetch request with offset out of range: [" + offset + "]; " + "retrying with default start offset time from configuration. " + - "configured start offset time: [" + config.startOffsetTime + "]"); - throw new UpdateOffsetException(); + "configured start offset time: [" + config.startOffsetTime + "]"; + LOG.warn(msg); + throw new UpdateOffsetException(msg); } else { String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]"; LOG.error(message); http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java index 1be7312..5c366ec 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java +++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java @@ -17,6 +17,9 @@ */ package storm.kafka; -public class UpdateOffsetException extends RuntimeException { +public class UpdateOffsetException extends FailedFetchException { + public UpdateOffsetException(String message) { + super(message); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java index 94bf134..34566c5 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java +++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java @@ -33,6 +33,7 @@ import storm.kafka.DynamicPartitionConnections; import storm.kafka.FailedFetchException; import storm.kafka.KafkaUtils; import storm.kafka.Partition; +import storm.kafka.UpdateOffsetException; import storm.trident.operation.TridentCollector; import storm.trident.spout.IOpaquePartitionedTridentSpout; import storm.trident.spout.IPartitionedTridentSpout; @@ -129,7 +130,14 @@ public class TridentKafkaEmitter { private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) { long start = System.nanoTime(); - ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset); + ByteBufferMessageSet msgs = null; + try { + msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset); + } catch (UpdateOffsetException e) { + long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config); + LOG.warn("OffsetOutOfRange, Updating offset from offset = " + offset + " to offset = " + newOffset); + msgs = KafkaUtils.fetchMessages(_config, consumer, partition, newOffset); + } long end = System.nanoTime(); long millis = (end - start) / 1000000; _kafkaMeanFetchLatencyMetric.update(millis);
