Updated Branches: refs/heads/master dbe35de84 -> 4e6f1ca5d
SAMZA-62; retry on offset request failures in kafka checkpoint manager. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/965bc933 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/965bc933 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/965bc933 Branch: refs/heads/master Commit: 965bc9338ce8d59192e3abc350cfc7f62a87dbef Parents: dbe35de Author: Chris Riccomini <[email protected]> Authored: Wed Oct 23 11:11:23 2013 -0700 Committer: Chris Riccomini <[email protected]> Committed: Wed Oct 23 11:11:23 2013 -0700 ---------------------------------------------------------------------- .../samza/checkpoint/kafka/KafkaCheckpointManager.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/965bc933/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala ---------------------------------------------------------------------- diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala index a9ddc5c..27b38b2 100644 --- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala +++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala @@ -130,10 +130,15 @@ class KafkaCheckpointManager( bufferSize, clientId) val topicAndPartition = new TopicAndPartition(stateTopic, partitionId) - val offset = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))) + val offsetResponse = consumer.getOffsetsBefore(new OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))) .partitionErrorAndOffsets .get(topicAndPartition) .getOrElse(throw new KafkaCheckpointException("Unable to find offset information for %s:%d" format (stateTopic, partitionId))) + + // Fail or retry if there was an an issue with the offset request. + ErrorMapping.maybeThrowException(offsetResponse.error) + + val offset = offsetResponse .offsets .headOption .getOrElse(throw new KafkaCheckpointException("Got response, but no offsets defined for %s:%d" format (stateTopic, partitionId))) @@ -147,7 +152,7 @@ class KafkaCheckpointManager( val request = new FetchRequestBuilder() // Kafka returns 1 greater than the offset of the last message in - //the topic, so subtract one to fetch the last message. + // the topic, so subtract one to fetch the last message. .addFetch(stateTopic, partitionId, offset - 1, fetchSize) .maxWait(500) .minBytes(1)
