Jakob Homan created SAMZA-86:
--------------------------------

             Summary: GetOffset:getNextOffset needs some work
                 Key: SAMZA-86
                 URL: https://issues.apache.org/jira/browse/SAMZA-86
             Project: Samza
          Issue Type: Improvement
            Reporter: Jakob Homan


{code}  def getNextOffset(sc: SimpleConsumer with DefaultFetch, tp: 
TopicAndPartition, lastCheckpointedOffset: String): Long = {
    val offsetRequest = new OffsetRequest(Map(tp -> new 
PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
    val offsetResponse = sc.getOffsetsBefore(offsetRequest)
    val partitionOffsetResponse = 
offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to find 
offset information for %s" format tp))
    val autoOffset = 
partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but no 
offsets defined for %s" format tp))

    info("Got offset %d for topic and partition %s" format (autoOffset, tp))

    val actualOffset = Option(lastCheckpointedOffset) match {
      case Some(last) => useLastCheckpointedOffset(sc, last, 
tp).getOrElse(autoOffset)
      case None => autoOffset
    }
{code} 
lastCheckpointedOffset being coerced into an option here sucks, particularly 
since at least one QA job is passing in a null as the value, which hits the 
Some case.  It would be better to make the parameter an Option at the method 
level and be explicit about what needs to be passed in.



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to