Jakob Homan created SAMZA-86:
--------------------------------
Summary: GetOffset:getNextOffset needs some work
Key: SAMZA-86
URL: https://issues.apache.org/jira/browse/SAMZA-86
Project: Samza
Issue Type: Improvement
Reporter: Jakob Homan
{code} def getNextOffset(sc: SimpleConsumer with DefaultFetch, tp:
TopicAndPartition, lastCheckpointedOffset: String): Long = {
val offsetRequest = new OffsetRequest(Map(tp -> new
PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
val offsetResponse = sc.getOffsetsBefore(offsetRequest)
val partitionOffsetResponse =
offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to find
offset information for %s" format tp))
val autoOffset =
partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no
offsets defined for %s" format tp))
info("Got offset %d for topic and partition %s" format (autoOffset, tp))
val actualOffset = Option(lastCheckpointedOffset) match {
case Some(last) => useLastCheckpointedOffset(sc, last,
tp).getOrElse(autoOffset)
case None => autoOffset
}
{code}
lastCheckpointedOffset being coerced into an option here sucks, particularly
since at least one QA job is passing in a null as the value, which hits the
Some case. It would be better to make the parameter an Option at the method
level and be explicit about what needs to be passed in.
--
This message was sent by Atlassian JIRA
(v6.1#6144)