[
https://issues.apache.org/jira/browse/SAMZA-1822?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592571#comment-16592571
]
DM commented on SAMZA-1822:
---------------------------
Uploaded Gaurav Agarwal's patch as discussed in the [mailing
list|https://lists.apache.org/thread.html/1fa014de0f57e31c877420b42df6d2fb9e2768492a9a2943d321c0e3@%3Cdev.samza.apache.org%3E].
> Samza 0.14.1 not correctly handling OffsetOutOfRangeException exception
> -----------------------------------------------------------------------
>
> Key: SAMZA-1822
> URL: https://issues.apache.org/jira/browse/SAMZA-1822
> Project: Samza
> Issue Type: Bug
> Affects Versions: 0.14.0, 0.14.1
> Reporter: DM
> Priority: Major
> Attachments: patch.txt
>
>
>
> Samza is requesting for an Kafka partition offset that is too old (i.e Kafka
> log has moved ahead). We are setting the property consumer.auto.offset.reset
> to smallest and therefore expecting that Samza will reset its checkpoint to
> earliest available partition offset in such a scenario. But that is not
> happening we are getting exceptions of this form continually:
> {code:java}
> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
> kafka.producer.SyncProducer:[Logging_class:info:66] - [main] -
> Disconnecting from vrni-platform-release:9092
> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
> system.kafka.GetOffset:[Logging_class:info:63] - [main] - Validating offset
> 56443499 for topic and partition Topic3-0
> WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658]
> system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - [main] - While
> refreshing brokers for Topic3-0:
> org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested
> offset is not within the range of offsets maintained by the server..
> Retrying
> {code}
> Browsing through the code, it appears that {{GetOffset::isValidOffset}}
> should be able to catch the exception {{OffsetOutOfRangeException}} and
> convert it to a false value. But it appears that this not happening. This
> appears due to the mismatch in package of the {{Exception}}
> [GetOffSet|https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala#L56]
> class is catching the exception {{import
> kafka.common.OffsetOutOfRangeException}}, but from logs, it appears that the
> package of this class is different.
> {code:java}
> def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition:
> TopicAndPartition, offset: String) = {
> info("Validating offset %s for topic and partition %s" format (offset,
> topicAndPartition))
> try {
> val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
> if (messages.hasError) {
> KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic,
> topicAndPartition.partition).exception())
> }
> info("Able to successfully read from offset %s for topic and partition
> %s. Using it to instantiate consumer." format (offset, topicAndPartition))
> true
> } catch {
> case e: OffsetOutOfRangeException => false
> }
> }
> {code}
> Also, it Appears that
> [BrokerProxy|https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L85]
> class - the caller of {{GetOffset}} would print a log {{It appears that...}}
> in case it gets a false value, but it is not logging this line (indicating
> that some {{Exception}} generated in {{GetOffset}} method is going uncaught
> and being propagated up):
> {code:java}
> def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
> debug("Adding new topic and partition %s to queue for %s" format (tp,
> host))
> if (nextOffsets.asJava.containsKey(tp)) {
> toss("Already consuming TopicPartition %s" format tp)
> }
> val offset = if (nextOffset.isDefined &&
> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
> nextOffset
> .get
> .toLong
> } else {
> warn("It appears that we received an invalid or empty offset %s for %s.
> Attempting to use Kafka's auto.offset.reset setting. This can result in data
> loss if processing continues." format (nextOffset, tp))
> offsetGetter.getResetOffset(simpleConsumer, tp)
> }
> debug("Got offset %s for new topic and partition %s." format (offset, tp))
> nextOffsets += tp -> offset
> metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
> }
> {code}
> The above is coming becuase of
> [KIP-109|https://cwiki.apache.org/confluence/display/KAFKA/KIP-109%3A+Old+Consumer+Deprecation]
> and its related [JIRA|http://example.com/] &
> [PR|https://github.com/apache/kafka/pull/2328]
> This issue is discussed in [mailing
> list|https://lists.apache.org/thread.html/1fa014de0f57e31c877420b42df6d2fb9e2768492a9a2943d321c0e3@%3Cdev.samza.apache.org%3E].
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)