STORM-616 : removing unintended changes.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d260759a Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d260759a Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d260759a Branch: refs/heads/master Commit: d260759ac203383e27668a7cb7090926029f7406 Parents: ab9f778 Author: Parth Brahmbhatt <[email protected]> Authored: Mon Jan 5 22:31:05 2015 -0500 Committer: Parth Brahmbhatt <[email protected]> Committed: Mon Jan 5 22:31:05 2015 -0500 ---------------------------------------------------------------------- external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 7 +++---- .../src/jvm/storm/kafka/UpdateOffsetException.java | 5 +---- .../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java | 10 +--------- 3 files changed, 5 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d260759a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java index 3165189..918da74 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java @@ -180,11 +180,10 @@ public class KafkaUtils { if (fetchResponse.hasError()) { KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId)); if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) { - String msg = "Got fetch request with offset out of range: [" + offset + "]; " + + LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " + "retrying with default start offset time from configuration. " + - "configured start offset time: [" + config.startOffsetTime + "]"; - LOG.warn(msg); - throw new UpdateOffsetException(msg); + "configured start offset time: [" + config.startOffsetTime + "]"); + throw new UpdateOffsetException(); } else { String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]"; LOG.error(message); http://git-wip-us.apache.org/repos/asf/storm/blob/d260759a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java index 5c366ec..1be7312 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java +++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java @@ -17,9 +17,6 @@ */ package storm.kafka; -public class UpdateOffsetException extends FailedFetchException { +public class UpdateOffsetException extends RuntimeException { - public UpdateOffsetException(String message) { - super(message); - } } http://git-wip-us.apache.org/repos/asf/storm/blob/d260759a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java index 34566c5..94bf134 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java +++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java @@ -33,7 +33,6 @@ import storm.kafka.DynamicPartitionConnections; import storm.kafka.FailedFetchException; import storm.kafka.KafkaUtils; import storm.kafka.Partition; -import storm.kafka.UpdateOffsetException; import storm.trident.operation.TridentCollector; import storm.trident.spout.IOpaquePartitionedTridentSpout; import storm.trident.spout.IPartitionedTridentSpout; @@ -130,14 +129,7 @@ public class TridentKafkaEmitter { private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) { long start = System.nanoTime(); - ByteBufferMessageSet msgs = null; - try { - msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset); - } catch (UpdateOffsetException e) { - long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config); - LOG.warn("OffsetOutOfRange, Updating offset from offset = " + offset + " to offset = " + newOffset); - msgs = KafkaUtils.fetchMessages(_config, consumer, partition, newOffset); - } + ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset); long end = System.nanoTime(); long millis = (end - start) / 1000000; _kafkaMeanFetchLatencyMetric.update(millis);
