[
https://issues.apache.org/jira/browse/SAMZA-86?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13830192#comment-13830192
]
Chris Riccomini commented on SAMZA-86:
--------------------------------------
Yea, this styling is somewhat debatable, since we don't have a clearly defined
style-guide for this case.
bq. The contention seems we are NOT expecting null lastCheckpointedOffset at
all but it does show up, then wrapping it as Option is bad citizenship.
We actually are expecting null here. Null is legally returned by
CheckpointManager in cases where there is no checkpoint. Returning a null means
"I defer to the default offset". For Kafka, this means either starting from the
earliest or latest offset of the stream.
Regarding styling, I think we should stick to Samza's style guide
(http://samza.incubator.apache.org/contribute/coding-guide.html):
bq. Prefer Option to null in scala APIs.
This means we'd have to switch to using Option[String] for offsets all the way
up through KafkaSystemConsumer.register. We can't go farther than this because
the register() method is a Java-based API. If we do the translation of
lastReadOffset to an Option[String] in KafkaSystemConsumer.register, then we'll
need to update KafkaSystemConsumer.lastReadOffsets,
KafkaSystemConsumer.refreshBrokers, BrokerProxy.addTopicPartition, and
GetOffset.getNextOffset accordingly.
This would make lastReadOffsets look like:
{code}
var lastReadOffsets = Map[SystemStreamPartition, Option[String]]()
{code}
I'm not so crazy about nested Option classes:
{noformat}
scala> Map[String, Option[String]]().get("test")
res1: Option[Option[String]] = None
{noformat}
For that reason, I'd prefer to leave KafkaSystemConsumer.lastReadOffsets as is,
and do the String to Option[String] translation in
KafkaSystemConsumer.refreshBrokers when BrokerProxy.addTopicPartition is called
(i.e. update BrokerProxy.addTopicPartition to take an Option[String] for
lastCheckpointedOffset).
[~jghoman] what do you think?
> GetOffset:getNextOffset needs some work
> ---------------------------------------
>
> Key: SAMZA-86
> URL: https://issues.apache.org/jira/browse/SAMZA-86
> Project: Samza
> Issue Type: Improvement
> Reporter: Jakob Homan
> Assignee: Rekha Joshi
> Attachments: SAMZA_86_1.patch
>
>
> {code} def getNextOffset(sc: SimpleConsumer with DefaultFetch, tp:
> TopicAndPartition, lastCheckpointedOffset: String): Long = {
> val offsetRequest = new OffsetRequest(Map(tp -> new
> PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
> val offsetResponse = sc.getOffsetsBefore(offsetRequest)
> val partitionOffsetResponse =
> offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to
> find offset information for %s" format tp))
> val autoOffset =
> partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but
> no offsets defined for %s" format tp))
> info("Got offset %d for topic and partition %s" format (autoOffset, tp))
> val actualOffset = Option(lastCheckpointedOffset) match {
> case Some(last) => useLastCheckpointedOffset(sc, last,
> tp).getOrElse(autoOffset)
> case None => autoOffset
> }
> {code}
> lastCheckpointedOffset being coerced into an option here sucks, particularly
> since at least one QA job is passing in a null as the value, which hits the
> Some case. It would be better to make the parameter an Option at the method
> level and be explicit about what needs to be passed in.
--
This message was sent by Atlassian JIRA
(v6.1#6144)