STORM-511 update startOffset in PartitionManager when submitted in fetch 
request one is out of range


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/95abbc50
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/95abbc50
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/95abbc50

Branch: refs/heads/security
Commit: 95abbc501c8f29d08dcd1e6200e7ded65c41512e
Parents: a56ccc7
Author: Viktor Taranenko <[email protected]>
Authored: Sat Sep 27 15:24:15 2014 +0100
Committer: Viktor Taranenko <[email protected]>
Committed: Sat Sep 27 15:24:15 2014 +0100

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/KafkaUtils.java         | 64 ++++++++++----------
 .../src/jvm/storm/kafka/PartitionManager.java   |  8 ++-
 .../jvm/storm/kafka/UpdateOffsetException.java  | 10 +++
 .../src/test/storm/kafka/KafkaUtilsTest.java    |  6 +-
 4 files changed, 50 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/95abbc50/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java 
b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index db7ce3a..eab80eb 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -155,45 +155,43 @@ public class KafkaUtils {
         }
     }
 
-    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, 
SimpleConsumer consumer, Partition partition, long offset) {
+    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, 
SimpleConsumer consumer, Partition partition, long offset) throws 
UpdateOffsetException {
         ByteBufferMessageSet msgs = null;
         String topic = config.topic;
         int partitionId = partition.partition;
-        for (int errors = 0; errors < 2 && msgs == null; errors++) {
-            FetchRequestBuilder builder = new FetchRequestBuilder();
-            FetchRequest fetchRequest = builder.addFetch(topic, partitionId, 
offset, config.fetchSizeBytes).
-                    
clientId(config.clientId).maxWait(config.fetchMaxWait).build();
-            FetchResponse fetchResponse;
-            try {
-                fetchResponse = consumer.fetch(fetchRequest);
-            } catch (Exception e) {
-                if (e instanceof ConnectException ||
-                        e instanceof SocketTimeoutException ||
-                        e instanceof IOException ||
-                        e instanceof UnresolvedAddressException
-                        ) {
-                    LOG.warn("Network error when fetching messages:", e);
-                    throw new FailedFetchException(e);
-                } else {
-                    throw new RuntimeException(e);
-                }
+        FetchRequestBuilder builder = new FetchRequestBuilder();
+        FetchRequest fetchRequest = builder.addFetch(topic, partitionId, 
offset, config.fetchSizeBytes).
+                clientId(config.clientId).maxWait(config.fetchMaxWait).build();
+        FetchResponse fetchResponse;
+        try {
+            fetchResponse = consumer.fetch(fetchRequest);
+        } catch (Exception e) {
+            if (e instanceof ConnectException ||
+                    e instanceof SocketTimeoutException ||
+                    e instanceof IOException ||
+                    e instanceof UnresolvedAddressException
+                    ) {
+                LOG.warn("Network error when fetching messages:", e);
+                throw new FailedFetchException(e);
+            } else {
+                throw new RuntimeException(e);
             }
-            if (fetchResponse.hasError()) {
-                KafkaError error = 
KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
-                if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && 
config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
-                    long startOffset = getOffset(consumer, topic, partitionId, 
config.startOffsetTime);
-                    LOG.warn("Got fetch request with offset out of range: [" + 
offset + "]; " +
-                            "retrying with default start offset time from 
configuration. " +
-                            "configured start offset time: [" + 
config.startOffsetTime + "] offset: [" + startOffset + "]");
-                    offset = startOffset;
-                } else {
-                    String message = "Error fetching data from [" + partition 
+ "] for topic [" + topic + "]: [" + error + "]";
-                    LOG.error(message);
-                    throw new FailedFetchException(message);
-                }
+        }
+        if (fetchResponse.hasError()) {
+            KafkaError error = 
KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
+            if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && 
config.useStartOffsetTimeIfOffsetOutOfRange) {
+                long startOffset = getOffset(consumer, topic, partitionId, 
config.startOffsetTime);
+                LOG.warn("Got fetch request with offset out of range: [" + 
offset + "]; " +
+                        "retrying with default start offset time from 
configuration. " +
+                        "configured start offset time: [" + 
config.startOffsetTime + "] offset: [" + startOffset + "]");
+                throw new UpdateOffsetException(startOffset);
             } else {
-                msgs = fetchResponse.messageSet(topic, partitionId);
+                String message = "Error fetching data from [" + partition + "] 
for topic [" + topic + "]: [" + error + "]";
+                LOG.error(message);
+                throw new FailedFetchException(message);
             }
+        } else {
+            msgs = fetchResponse.messageSet(topic, partitionId);
         }
         return msgs;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/95abbc50/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java 
b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 9b48678..c228c19 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -27,6 +27,7 @@ import backtype.storm.utils.Utils;
 import com.google.common.collect.ImmutableMap;
 import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.ByteBufferMessageSet$;
 import kafka.message.MessageAndOffset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -156,7 +157,12 @@ public class PartitionManager {
             offset = _emittedToOffset;
         }
 
-        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, 
_consumer, _partition, offset);
+        ByteBufferMessageSet msgs = null;
+        try {
+            msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, 
_partition, offset);
+        } catch (UpdateOffsetException e) {
+            offset = e.startOffset;
+        }
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;
         _fetchAPILatencyMax.update(millis);

http://git-wip-us.apache.org/repos/asf/storm/blob/95abbc50/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java 
b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
new file mode 100644
index 0000000..69d8950
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
@@ -0,0 +1,10 @@
+package storm.kafka;
+
+public class UpdateOffsetException extends RuntimeException {
+
+    public final Long startOffset;
+
+    public UpdateOffsetException(Long _offset) {
+        this.startOffset = _offset;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/95abbc50/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java 
b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index b993843..34c9e1a 100644
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@ -99,15 +99,13 @@ public class KafkaUtilsTest {
                 new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), -99);
     }
 
-    @Test
+    @Test(expected = UpdateOffsetException.class)
     public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() 
throws Exception {
         config = new KafkaConfig(brokerHosts, "newTopic");
         String value = "test";
         createTopicAndSendMessage(value);
-        ByteBufferMessageSet messageAndOffsets = 
KafkaUtils.fetchMessages(config, simpleConsumer,
+        KafkaUtils.fetchMessages(config, simpleConsumer,
                 new 
Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), -99);
-        String message = new 
String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload()));
-        assertThat(message, is(equalTo(value)));
     }
 
     @Test

Reply via email to