[ 
https://issues.apache.org/jira/browse/SAMZA-1822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16594266#comment-16594266
 ] 

Yi Pan (Data Infrastructure) commented on SAMZA-1822:
-----------------------------------------------------

Added a unit test, merged and submitted. Thanks!

> Samza 0.14.1 not correctly handling OffsetOutOfRangeException exception
> -----------------------------------------------------------------------
>
>                 Key: SAMZA-1822
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1822
>             Project: Samza
>          Issue Type: Bug
>    Affects Versions: 0.14.0, 0.14.1
>            Reporter: DM
>            Priority: Major
>             Fix For: 1.0
>
>         Attachments: patch.txt
>
>
>  
> Samza is requesting for an Kafka partition offset that is too old (i.e Kafka 
> log has moved ahead). We are setting the property consumer.auto.offset.reset 
> to smallest and therefore expecting that Samza will reset its checkpoint to 
> earliest available partition offset in such a scenario. But that is not 
> happening we are getting exceptions of this form continually:
> {code:java}
> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
> kafka.producer.SyncProducer:[Logging_class:info:66] - [main] -
> Disconnecting from vrni-platform-release:9092
> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
> system.kafka.GetOffset:[Logging_class:info:63] - [main] - Validating offset
> 56443499 for topic and partition Topic3-0
> WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658]
> system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - [main] - While
> refreshing brokers for Topic3-0:
> org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested
> offset is not within the range of offsets maintained by the server..
> Retrying
> {code}
> Browsing through the code, it appears that {{GetOffset::isValidOffset}} 
> should be able to catch the exception {{OffsetOutOfRangeException}} and 
> convert it to a false value. But it appears that this not happening. This 
> appears due to the mismatch in package of the {{Exception}} 
> [GetOffSet|https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala#L56]
>  class is catching the exception {{import 
> kafka.common.OffsetOutOfRangeException}}, but from logs, it appears that the 
> package of this class is different.
> {code:java}
> def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: 
> TopicAndPartition, offset: String) = {
>     info("Validating offset %s for topic and partition %s" format (offset, 
> topicAndPartition))
>     try {
>       val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
>       if (messages.hasError) {
>         KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, 
> topicAndPartition.partition).exception())
>       }
>       info("Able to successfully read from offset %s for topic and partition 
> %s. Using it to instantiate consumer." format (offset, topicAndPartition))
>       true
>     } catch {
>       case e: OffsetOutOfRangeException => false
>     }
> }
> {code}
> Also, it Appears that 
> [BrokerProxy|https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L85]
>  class - the caller of {{GetOffset}} would print a log {{It appears that...}} 
> in case it gets a false value, but it is not logging this line (indicating 
> that some {{Exception}} generated in {{GetOffset}} method is going uncaught 
> and being propagated up):
> {code:java}
> def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
>     debug("Adding new topic and partition %s to queue for %s" format (tp, 
> host))
>     if (nextOffsets.asJava.containsKey(tp)) {
>       toss("Already consuming TopicPartition %s" format tp)
>     }
>     val offset = if (nextOffset.isDefined && 
> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
>       nextOffset
>         .get
>         .toLong
>     } else {
>       warn("It appears that we received an invalid or empty offset %s for %s. 
> Attempting to use Kafka's auto.offset.reset setting. This can result in data 
> loss if processing continues." format (nextOffset, tp))
>       offsetGetter.getResetOffset(simpleConsumer, tp)
>     }
>     debug("Got offset %s for new topic and partition %s." format (offset, tp))
>     nextOffsets += tp -> offset
>     metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
>   }
> {code}
> The above is coming becuase of 
> [KIP-109|https://cwiki.apache.org/confluence/display/KAFKA/KIP-109%3A+Old+Consumer+Deprecation]
>  and its related [JIRA|http://example.com/] & 
> [PR|https://github.com/apache/kafka/pull/2328]
> This issue is discussed in [mailing 
> list|https://lists.apache.org/thread.html/1fa014de0f57e31c877420b42df6d2fb9e2768492a9a2943d321c0e3@%3Cdev.samza.apache.org%3E].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to