[ 
https://issues.apache.org/jira/browse/SAMZA-86?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13845737#comment-13845737
 ] 

Jakob Homan commented on SAMZA-86:
----------------------------------

This looks like it may not be right yet.  Deploying today, I hit this on all 
offset requests:
{noformat}2013-12-11 20:14:30 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] 
Received OffsetOutOfRange exception for [TOPICREDACTED,0]. Current offset = 
40122820
2013-12-11 20:14:30 GetOffset [INFO] Checking if auto.offset.reset is defined 
for topic TOPICREDACTED
2013-12-11 20:14:30 GetOffset [INFO] Got reset of type largest.
2013-12-11 20:14:30 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Recreating 
simple consumer and retrying connection
2013-12-11 20:14:30 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Stack trace 
for fetchMessages exception.
scala.MatchError: null
  at org.apache.samza.system.kafka.GetOffset.getNextOffset(GetOffset.scala:99)
  at 
org.apache.samza.system.kafka.BrokerProxy$$anonfun$handleErrors$3.apply(BrokerProxy.scala:184)
  at 
org.apache.samza.system.kafka.BrokerProxy$$anonfun$handleErrors$3.apply(BrokerProxy.scala:180)
  at scala.collection.mutable.HashSet.foreach(HashSet.scala:61)
  at 
org.apache.samza.system.kafka.BrokerProxy.handleErrors(BrokerProxy.scala:180)
  at 
org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:141)
  at 
org.apache.samza.system.kafka.BrokerProxy$$anon$2.run(BrokerProxy.scala:110)
  at java.lang.Thread.run(Thread.java:619)
{noformat}
Reverting and deploying fixed the issue.  I'd like to revert the patch from 
trunk and take a look.


> GetOffset:getNextOffset needs some work
> ---------------------------------------
>
>                 Key: SAMZA-86
>                 URL: https://issues.apache.org/jira/browse/SAMZA-86
>             Project: Samza
>          Issue Type: Improvement
>            Reporter: Jakob Homan
>            Assignee: Rekha Joshi
>         Attachments: SAMZA-86.2.patch, SAMZA-86.3.patch, SAMZA-86.4.patch, 
> SAMZA_86_1.patch
>
>
> {code}  def getNextOffset(sc: SimpleConsumer with DefaultFetch, tp: 
> TopicAndPartition, lastCheckpointedOffset: String): Long = {
>     val offsetRequest = new OffsetRequest(Map(tp -> new 
> PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
>     val offsetResponse = sc.getOffsetsBefore(offsetRequest)
>     val partitionOffsetResponse = 
> offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to 
> find offset information for %s" format tp))
>     val autoOffset = 
> partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but 
> no offsets defined for %s" format tp))
>     info("Got offset %d for topic and partition %s" format (autoOffset, tp))
>     val actualOffset = Option(lastCheckpointedOffset) match {
>       case Some(last) => useLastCheckpointedOffset(sc, last, 
> tp).getOrElse(autoOffset)
>       case None => autoOffset
>     }
> {code} 
> lastCheckpointedOffset being coerced into an option here sucks, particularly 
> since at least one QA job is passing in a null as the value, which hits the 
> Some case.  It would be better to make the parameter an Option at the method 
> level and be explicit about what needs to be passed in.



--
This message was sent by Atlassian JIRA
(v6.1.4#6159)

Reply via email to