Revert "STORM-616 : removing unintended changes."

This reverts commit d260759ac203383e27668a7cb7090926029f7406.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ca235e6c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ca235e6c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ca235e6c

Branch: refs/heads/master
Commit: ca235e6cb18006bbbac56361639309e73c196718
Parents: 079deda
Author: Parth Brahmbhatt <[email protected]>
Authored: Tue Jan 6 17:43:58 2015 -0500
Committer: Parth Brahmbhatt <[email protected]>
Committed: Tue Jan 6 17:43:58 2015 -0500

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java  |  7 ++++---
 .../src/jvm/storm/kafka/UpdateOffsetException.java        |  5 ++++-
 .../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java  | 10 +++++++++-
 3 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java 
b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 918da74..3165189 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,10 +180,11 @@ public class KafkaUtils {
         if (fetchResponse.hasError()) {
             KafkaError error = 
KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
             if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && 
config.useStartOffsetTimeIfOffsetOutOfRange) {
-                LOG.warn("Got fetch request with offset out of range: [" + 
offset + "]; " +
+                String msg = "Got fetch request with offset out of range: [" + 
offset + "]; " +
                         "retrying with default start offset time from 
configuration. " +
-                        "configured start offset time: [" + 
config.startOffsetTime + "]");
-                throw new UpdateOffsetException();
+                        "configured start offset time: [" + 
config.startOffsetTime + "]";
+                LOG.warn(msg);
+                throw new UpdateOffsetException(msg);
             } else {
                 String message = "Error fetching data from [" + partition + "] 
for topic [" + topic + "]: [" + error + "]";
                 LOG.error(message);

http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java 
b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
index 1be7312..5c366ec 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
@@ -17,6 +17,9 @@
  */
 package storm.kafka;
 
-public class UpdateOffsetException extends RuntimeException {
+public class UpdateOffsetException extends FailedFetchException {
 
+    public UpdateOffsetException(String message) {
+        super(message);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java 
b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 94bf134..34566c5 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -33,6 +33,7 @@ import storm.kafka.DynamicPartitionConnections;
 import storm.kafka.FailedFetchException;
 import storm.kafka.KafkaUtils;
 import storm.kafka.Partition;
+import storm.kafka.UpdateOffsetException;
 import storm.trident.operation.TridentCollector;
 import storm.trident.spout.IOpaquePartitionedTridentSpout;
 import storm.trident.spout.IPartitionedTridentSpout;
@@ -129,7 +130,14 @@ public class TridentKafkaEmitter {
 
     private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, 
Partition partition, long offset) {
         long start = System.nanoTime();
-        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_config, 
consumer, partition, offset);
+        ByteBufferMessageSet msgs = null;
+        try {
+            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, 
offset);
+        } catch (UpdateOffsetException e) {
+            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, 
partition.partition, _config);
+            LOG.warn("OffsetOutOfRange, Updating offset from offset = " + 
offset + " to offset = " + newOffset);
+            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, 
newOffset);
+        }
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;
         _kafkaMeanFetchLatencyMetric.update(millis);

Reply via email to