[
https://issues.apache.org/jira/browse/SAMZA-86?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13845737#comment-13845737
]
Jakob Homan commented on SAMZA-86:
----------------------------------
This looks like it may not be right yet. Deploying today, I hit this on all
offset requests:
{noformat}2013-12-11 20:14:30 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN]
Received OffsetOutOfRange exception for [TOPICREDACTED,0]. Current offset =
40122820
2013-12-11 20:14:30 GetOffset [INFO] Checking if auto.offset.reset is defined
for topic TOPICREDACTED
2013-12-11 20:14:30 GetOffset [INFO] Got reset of type largest.
2013-12-11 20:14:30 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Recreating
simple consumer and retrying connection
2013-12-11 20:14:30 KafkaSystemConsumer$$anonfun$7$$anon$1 [WARN] Stack trace
for fetchMessages exception.
scala.MatchError: null
at org.apache.samza.system.kafka.GetOffset.getNextOffset(GetOffset.scala:99)
at
org.apache.samza.system.kafka.BrokerProxy$$anonfun$handleErrors$3.apply(BrokerProxy.scala:184)
at
org.apache.samza.system.kafka.BrokerProxy$$anonfun$handleErrors$3.apply(BrokerProxy.scala:180)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:61)
at
org.apache.samza.system.kafka.BrokerProxy.handleErrors(BrokerProxy.scala:180)
at
org.apache.samza.system.kafka.BrokerProxy.org$apache$samza$system$kafka$BrokerProxy$$fetchMessages(BrokerProxy.scala:141)
at
org.apache.samza.system.kafka.BrokerProxy$$anon$2.run(BrokerProxy.scala:110)
at java.lang.Thread.run(Thread.java:619)
{noformat}
Reverting and deploying fixed the issue. I'd like to revert the patch from
trunk and take a look.
> GetOffset:getNextOffset needs some work
> ---------------------------------------
>
> Key: SAMZA-86
> URL: https://issues.apache.org/jira/browse/SAMZA-86
> Project: Samza
> Issue Type: Improvement
> Reporter: Jakob Homan
> Assignee: Rekha Joshi
> Attachments: SAMZA-86.2.patch, SAMZA-86.3.patch, SAMZA-86.4.patch,
> SAMZA_86_1.patch
>
>
> {code} def getNextOffset(sc: SimpleConsumer with DefaultFetch, tp:
> TopicAndPartition, lastCheckpointedOffset: String): Long = {
> val offsetRequest = new OffsetRequest(Map(tp -> new
> PartitionOffsetRequestInfo(getAutoOffset(tp.topic), 1)))
> val offsetResponse = sc.getOffsetsBefore(offsetRequest)
> val partitionOffsetResponse =
> offsetResponse.partitionErrorAndOffsets.get(tp).getOrElse(toss("Unable to
> find offset information for %s" format tp))
> val autoOffset =
> partitionOffsetResponse.offsets.headOption.getOrElse(toss("Got response, but
> no offsets defined for %s" format tp))
> info("Got offset %d for topic and partition %s" format (autoOffset, tp))
> val actualOffset = Option(lastCheckpointedOffset) match {
> case Some(last) => useLastCheckpointedOffset(sc, last,
> tp).getOrElse(autoOffset)
> case None => autoOffset
> }
> {code}
> lastCheckpointedOffset being coerced into an option here sucks, particularly
> since at least one QA job is passing in a null as the value, which hits the
> Some case. It would be better to make the parameter an Option at the method
> level and be explicit about what needs to be passed in.
--
This message was sent by Atlassian JIRA
(v6.1.4#6159)