[
https://issues.apache.org/jira/browse/SAMZA-86?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Riccomini reopened SAMZA-86:
----------------------------------
> GetOffset:getNextOffset needs some work
> ---------------------------------------
>
> Key: SAMZA-86
> URL: https://issues.apache.org/jira/browse/SAMZA-86
> Project: Samza
> Issue Type: Improvement
> Reporter: Jakob Homan
> Assignee: Rekha Joshi
> Attachments: SAMZA-86.2.patch, SAMZA-86.3.patch, SAMZA-86.4.patch,
> SAMZA_86_1.patch
>
>
> {code} def getNextOffset(sc: SimpleConsumer with DefaultFetch, tp:
> TopicAndPartition, lastCheckpointedOffset: String): Long = {
> val offsetRequest = new OffsetRequest(Map(tp -> new
> PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
> val offsetResponse = sc.getOffsetsBefore(offsetRequest)
> val partitionOffsetResponse =
> offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to
> find offset information for %s" format tp))
> val autoOffset =
> partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but
> no offsets defined for %s" format tp))
> info("Got offset %d for topic and partition %s" format (autoOffset, tp))
> val actualOffset = Option(lastCheckpointedOffset) match {
> case Some(last) => useLastCheckpointedOffset(sc, last,
> tp).getOrElse(autoOffset)
> case None => autoOffset
> }
> {code}
> lastCheckpointedOffset being coerced into an option here sucks, particularly
> since at least one QA job is passing in a null as the value, which hits the
> Some case. It would be better to make the parameter an Option at the method
> level and be explicit about what needs to be passed in.
--
This message was sent by Atlassian JIRA
(v6.1.4#6159)