STORM-511 update startOffset in PartitionManager when submitted in fetch request one is out of range
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/95abbc50 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/95abbc50 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/95abbc50 Branch: refs/heads/security Commit: 95abbc501c8f29d08dcd1e6200e7ded65c41512e Parents: a56ccc7 Author: Viktor Taranenko <[email protected]> Authored: Sat Sep 27 15:24:15 2014 +0100 Committer: Viktor Taranenko <[email protected]> Committed: Sat Sep 27 15:24:15 2014 +0100 ---------------------------------------------------------------------- .../src/jvm/storm/kafka/KafkaUtils.java | 64 ++++++++++---------- .../src/jvm/storm/kafka/PartitionManager.java | 8 ++- .../jvm/storm/kafka/UpdateOffsetException.java | 10 +++ .../src/test/storm/kafka/KafkaUtilsTest.java | 6 +- 4 files changed, 50 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/95abbc50/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java index db7ce3a..eab80eb 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java @@ -155,45 +155,43 @@ public class KafkaUtils { } } - public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) { + public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) throws UpdateOffsetException { ByteBufferMessageSet msgs = null; String topic = config.topic; int partitionId = partition.partition; - for (int errors = 0; errors < 2 && msgs == null; errors++) { - FetchRequestBuilder builder = new FetchRequestBuilder(); - FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes). - clientId(config.clientId).maxWait(config.fetchMaxWait).build(); - FetchResponse fetchResponse; - try { - fetchResponse = consumer.fetch(fetchRequest); - } catch (Exception e) { - if (e instanceof ConnectException || - e instanceof SocketTimeoutException || - e instanceof IOException || - e instanceof UnresolvedAddressException - ) { - LOG.warn("Network error when fetching messages:", e); - throw new FailedFetchException(e); - } else { - throw new RuntimeException(e); - } + FetchRequestBuilder builder = new FetchRequestBuilder(); + FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes). + clientId(config.clientId).maxWait(config.fetchMaxWait).build(); + FetchResponse fetchResponse; + try { + fetchResponse = consumer.fetch(fetchRequest); + } catch (Exception e) { + if (e instanceof ConnectException || + e instanceof SocketTimeoutException || + e instanceof IOException || + e instanceof UnresolvedAddressException + ) { + LOG.warn("Network error when fetching messages:", e); + throw new FailedFetchException(e); + } else { + throw new RuntimeException(e); } - if (fetchResponse.hasError()) { - KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId)); - if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) { - long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime); - LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " + - "retrying with default start offset time from configuration. " + - "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]"); - offset = startOffset; - } else { - String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]"; - LOG.error(message); - throw new FailedFetchException(message); - } + } + if (fetchResponse.hasError()) { + KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId)); + if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) { + long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime); + LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " + + "retrying with default start offset time from configuration. " + + "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]"); + throw new UpdateOffsetException(startOffset); } else { - msgs = fetchResponse.messageSet(topic, partitionId); + String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]"; + LOG.error(message); + throw new FailedFetchException(message); } + } else { + msgs = fetchResponse.messageSet(topic, partitionId); } return msgs; } http://git-wip-us.apache.org/repos/asf/storm/blob/95abbc50/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index 9b48678..c228c19 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -27,6 +27,7 @@ import backtype.storm.utils.Utils; import com.google.common.collect.ImmutableMap; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.ByteBufferMessageSet$; import kafka.message.MessageAndOffset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,7 +157,12 @@ public class PartitionManager { offset = _emittedToOffset; } - ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); + ByteBufferMessageSet msgs = null; + try { + msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); + } catch (UpdateOffsetException e) { + offset = e.startOffset; + } long end = System.nanoTime(); long millis = (end - start) / 1000000; _fetchAPILatencyMax.update(millis); http://git-wip-us.apache.org/repos/asf/storm/blob/95abbc50/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java new file mode 100644 index 0000000..69d8950 --- /dev/null +++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java @@ -0,0 +1,10 @@ +package storm.kafka; + +public class UpdateOffsetException extends RuntimeException { + + public final Long startOffset; + + public UpdateOffsetException(Long _offset) { + this.startOffset = _offset; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/95abbc50/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java index b993843..34c9e1a 100644 --- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java +++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java @@ -99,15 +99,13 @@ public class KafkaUtilsTest { new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), -99); } - @Test + @Test(expected = UpdateOffsetException.class) public void fetchMessagesWithInvalidOffsetAndDefaultHandlingEnabled() throws Exception { config = new KafkaConfig(brokerHosts, "newTopic"); String value = "test"; createTopicAndSendMessage(value); - ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(config, simpleConsumer, + KafkaUtils.fetchMessages(config, simpleConsumer, new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0), -99); - String message = new String(Utils.toByteArray(messageAndOffsets.iterator().next().message().payload())); - assertThat(message, is(equalTo(value))); } @Test
