[ 
https://issues.apache.org/jira/browse/SAMZA-86?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13831798#comment-13831798
 ] 

Jakob Homan commented on SAMZA-86:
----------------------------------

Apparently satire doesn't communicate well over JIIRA.

Thinking about this some more, I'd like to make the convert-to-Option point be 
at the BrokerProxy.addTopicPartition.  BrokerProxy and GetOffset are private 
implementation details of the KafkaSystem, so it's a good place to enforce this 
orthodoxy.

This means keeping lastReadOffset typed to \[SSP, String] (with possible null 
values) and wrapping the get call to an Option:
{code}scala> val m = scala.collection.mutable.Map[String, String]()
m: scala.collection.mutable.Map[String,String] = Map()

scala> m.put("a", null)
res0: Option[String] = None

scala> m.put("b", "B")
res2: Option[String] = None

scala> m.get("a").flatMap(Option(_))
res3: Option[String] = None

scala> m.get("b").flatMap(Option(_))
res4: Option[String] = Some(B){code}
Practically, this means changing the iteration over the map in 
KafkaSystemConsumer:refreshBrokers to be an iteration over the keys, pulling in 
and converting the values as above.  It would also be good to explicitly not 
that nulls are ok at the definition of lastReadOffsets.

This will allow the Option use to cascade into BrokerProxy and GetOffset.  Does 
that sound good, Rekha? 

> GetOffset:getNextOffset needs some work
> ---------------------------------------
>
>                 Key: SAMZA-86
>                 URL: https://issues.apache.org/jira/browse/SAMZA-86
>             Project: Samza
>          Issue Type: Improvement
>            Reporter: Jakob Homan
>            Assignee: Rekha Joshi
>         Attachments: SAMZA_86_1.patch
>
>
> {code}  def getNextOffset(sc: SimpleConsumer with DefaultFetch, tp: 
> TopicAndPartition, lastCheckpointedOffset: String): Long = {
>     val offsetRequest = new OffsetRequest(Map(tp -> new 
> PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
>     val offsetResponse = sc.getOffsetsBefore(offsetRequest)
>     val partitionOffsetResponse = 
> offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to 
> find offset information for %s" format tp))
>     val autoOffset = 
> partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but 
> no offsets defined for %s" format tp))
>     info("Got offset %d for topic and partition %s" format (autoOffset, tp))
>     val actualOffset = Option(lastCheckpointedOffset) match {
>       case Some(last) => useLastCheckpointedOffset(sc, last, 
> tp).getOrElse(autoOffset)
>       case None => autoOffset
>     }
> {code} 
> lastCheckpointedOffset being coerced into an option here sucks, particularly 
> since at least one QA job is passing in a null as the value, which hits the 
> Some case.  It would be better to make the parameter an Option at the method 
> level and be explicit about what needs to be passed in.



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to