Updated Branches:
  refs/heads/master dbe35de84 -> 4e6f1ca5d

SAMZA-62; retry on offset request failures in kafka checkpoint manager.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/965bc933
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/965bc933
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/965bc933

Branch: refs/heads/master
Commit: 965bc9338ce8d59192e3abc350cfc7f62a87dbef
Parents: dbe35de
Author: Chris Riccomini <[email protected]>
Authored: Wed Oct 23 11:11:23 2013 -0700
Committer: Chris Riccomini <[email protected]>
Committed: Wed Oct 23 11:11:23 2013 -0700

----------------------------------------------------------------------
 .../samza/checkpoint/kafka/KafkaCheckpointManager.scala     | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/965bc933/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index a9ddc5c..27b38b2 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -130,10 +130,15 @@ class KafkaCheckpointManager(
           bufferSize,
           clientId)
         val topicAndPartition = new TopicAndPartition(stateTopic, partitionId)
-        val offset = consumer.getOffsetsBefore(new 
OffsetRequest(Map(topicAndPartition -> 
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
+        val offsetResponse = consumer.getOffsetsBefore(new 
OffsetRequest(Map(topicAndPartition -> 
PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1))))
           .partitionErrorAndOffsets
           .get(topicAndPartition)
           .getOrElse(throw new KafkaCheckpointException("Unable to find offset 
information for %s:%d" format (stateTopic, partitionId)))
+
+        // Fail or retry if there was an an issue with the offset request.
+        ErrorMapping.maybeThrowException(offsetResponse.error)
+
+        val offset = offsetResponse
           .offsets
           .headOption
           .getOrElse(throw new KafkaCheckpointException("Got response, but no 
offsets defined for %s:%d" format (stateTopic, partitionId)))
@@ -147,7 +152,7 @@ class KafkaCheckpointManager(
 
         val request = new FetchRequestBuilder()
           // Kafka returns 1 greater than the offset of the last message in 
-          //the topic, so subtract one to fetch the last message.
+          // the topic, so subtract one to fetch the last message.
           .addFetch(stateTopic, partitionId, offset - 1, fetchSize)
           .maxWait(500)
           .minBytes(1)

Reply via email to