move new offset calculation to PartitionManager; Don't update metrics on failed 
fetchå


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/67b5f56c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/67b5f56c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/67b5f56c

Branch: refs/heads/security
Commit: 67b5f56c1ff3905c88eb85dbb8985c7bb8342de9
Parents: 1897dee
Author: P. Taylor Goetz <[email protected]>
Authored: Thu Oct 23 16:27:07 2014 -0400
Committer: P. Taylor Goetz <[email protected]>
Committed: Thu Oct 23 16:27:07 2014 -0400

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java        | 5 ++---
 external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java  | 5 ++++-
 .../storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java  | 5 -----
 3 files changed, 6 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/67b5f56c/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java 
b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index eab80eb..918da74 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,11 +180,10 @@ public class KafkaUtils {
         if (fetchResponse.hasError()) {
             KafkaError error = 
KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
             if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && 
config.useStartOffsetTimeIfOffsetOutOfRange) {
-                long startOffset = getOffset(consumer, topic, partitionId, 
config.startOffsetTime);
                 LOG.warn("Got fetch request with offset out of range: [" + 
offset + "]; " +
                         "retrying with default start offset time from 
configuration. " +
-                        "configured start offset time: [" + 
config.startOffsetTime + "] offset: [" + startOffset + "]");
-                throw new UpdateOffsetException(startOffset);
+                        "configured start offset time: [" + 
config.startOffsetTime + "]");
+                throw new UpdateOffsetException();
             } else {
                 String message = "Error fetching data from [" + partition + "] 
for topic [" + topic + "]: [" + error + "]";
                 LOG.error(message);

http://git-wip-us.apache.org/repos/asf/storm/blob/67b5f56c/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java 
b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index fa5f7e5..d24a49e 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -161,7 +161,10 @@ public class PartitionManager {
         try {
             msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, 
_partition, offset);
         } catch (UpdateOffsetException e) {
-            _emittedToOffset = e.startOffset;
+            _emittedToOffset = KafkaUtils.getOffset(_consumer, 
_spoutConfig.topic, _partition.partition, _spoutConfig);
+            LOG.warn("Using new offset: {}", _emittedToOffset);
+            // fetch failed, so don't update the metrics
+            return;
         }
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;

http://git-wip-us.apache.org/repos/asf/storm/blob/67b5f56c/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java 
b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
index 69d8950..510c8cd 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
@@ -2,9 +2,4 @@ package storm.kafka;
 
 public class UpdateOffsetException extends RuntimeException {
 
-    public final Long startOffset;
-
-    public UpdateOffsetException(Long _offset) {
-        this.startOffset = _offset;
-    }
 }

Reply via email to