[
https://issues.apache.org/jira/browse/SAMZA-86?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13831798#comment-13831798
]
Jakob Homan commented on SAMZA-86:
----------------------------------
Apparently satire doesn't communicate well over JIIRA.
Thinking about this some more, I'd like to make the convert-to-Option point be
at the BrokerProxy.addTopicPartition. BrokerProxy and GetOffset are private
implementation details of the KafkaSystem, so it's a good place to enforce this
orthodoxy.
This means keeping lastReadOffset typed to \[SSP, String] (with possible null
values) and wrapping the get call to an Option:
{code}scala> val m = scala.collection.mutable.Map[String, String]()
m: scala.collection.mutable.Map[String,String] = Map()
scala> m.put("a", null)
res0: Option[String] = None
scala> m.put("b", "B")
res2: Option[String] = None
scala> m.get("a").flatMap(Option(_))
res3: Option[String] = None
scala> m.get("b").flatMap(Option(_))
res4: Option[String] = Some(B){code}
Practically, this means changing the iteration over the map in
KafkaSystemConsumer:refreshBrokers to be an iteration over the keys, pulling in
and converting the values as above. It would also be good to explicitly not
that nulls are ok at the definition of lastReadOffsets.
This will allow the Option use to cascade into BrokerProxy and GetOffset. Does
that sound good, Rekha?
> GetOffset:getNextOffset needs some work
> ---------------------------------------
>
> Key: SAMZA-86
> URL: https://issues.apache.org/jira/browse/SAMZA-86
> Project: Samza
> Issue Type: Improvement
> Reporter: Jakob Homan
> Assignee: Rekha Joshi
> Attachments: SAMZA_86_1.patch
>
>
> {code} def getNextOffset(sc: SimpleConsumer with DefaultFetch, tp:
> TopicAndPartition, lastCheckpointedOffset: String): Long = {
> val offsetRequest = new OffsetRequest(Map(tp -> new
> PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
> val offsetResponse = sc.getOffsetsBefore(offsetRequest)
> val partitionOffsetResponse =
> offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to
> find offset information for %s" format tp))
> val autoOffset =
> partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but
> no offsets defined for %s" format tp))
> info("Got offset %d for topic and partition %s" format (autoOffset, tp))
> val actualOffset = Option(lastCheckpointedOffset) match {
> case Some(last) => useLastCheckpointedOffset(sc, last,
> tp).getOrElse(autoOffset)
> case None => autoOffset
> }
> {code}
> lastCheckpointedOffset being coerced into an option here sucks, particularly
> since at least one QA job is passing in a null as the value, which hits the
> Some case. It would be better to make the parameter an Option at the method
> level and be explicit about what needs to be passed in.
--
This message was sent by Atlassian JIRA
(v6.1#6144)