[ 
https://issues.apache.org/jira/browse/SAMZA-1822?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DM updated SAMZA-1822:
----------------------
    Description: 
 

Samza is requesting for an Kafka partition offset that is too old (i.e Kafka 
log has moved ahead). We are setting the property consumer.auto.offset.reset to 
smallest and therefore expecting that Samza will reset its checkpoint to 
earliest available partition offset in such a scenario. But that is not 
happening we are getting exceptions of this form continually:
{code:java}
INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
kafka.producer.SyncProducer:[Logging_class:info:66] - [main] -
Disconnecting from vrni-platform-release:9092
INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
system.kafka.GetOffset:[Logging_class:info:63] - [main] - Validating offset
56443499 for topic and partition Topic3-0
WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658]
system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - [main] - While
refreshing brokers for Topic3-0:
org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested
offset is not within the range of offsets maintained by the server..
Retrying
{code}
Browsing through the code, it appears that {{GetOffset::isValidOffset}} should 
be able to catch the exception {{OffsetOutOfRangeException}} and convert it to 
a false value. But it appears that this not happening. This appears due to the 
mismatch in package of the {{Exception}} 
[GetOffSet|https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala#L56]
 class is catching the exception {{import 
kafka.common.OffsetOutOfRangeException}}, but from logs, it appears that the 
package of this class is different.
{code:java}
def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: 
TopicAndPartition, offset: String) = {
    info("Validating offset %s for topic and partition %s" format (offset, 
topicAndPartition))

    try {
      val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))

      if (messages.hasError) {
        KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, 
topicAndPartition.partition).exception())
      }

      info("Able to successfully read from offset %s for topic and partition 
%s. Using it to instantiate consumer." format (offset, topicAndPartition))

      true
    } catch {
      case e: OffsetOutOfRangeException => false
    }
}
{code}
Also, it Appears that 
[BrokerProxy|https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L85]
 class - the caller of {{GetOffset}} would print a log {{It appears that...}} 
in case it gets a false value, but it is not logging this line (indicating that 
some {{Exception}} generated in {{GetOffset}} method is going uncaught and 
being propagated up):
{code:java}
def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
    debug("Adding new topic and partition %s to queue for %s" format (tp, host))

    if (nextOffsets.asJava.containsKey(tp)) {
      toss("Already consuming TopicPartition %s" format tp)
    }

    val offset = if (nextOffset.isDefined && 
offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
      nextOffset
        .get
        .toLong
    } else {
      warn("It appears that we received an invalid or empty offset %s for %s. 
Attempting to use Kafka's auto.offset.reset setting. This can result in data 
loss if processing continues." format (nextOffset, tp))

      offsetGetter.getResetOffset(simpleConsumer, tp)
    }

    debug("Got offset %s for new topic and partition %s." format (offset, tp))

    nextOffsets += tp -> offset

    metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
  }
{code}
The above is coming becuase of 
[KIP-109|https://cwiki.apache.org/confluence/display/KAFKA/KIP-109%3A+Old+Consumer+Deprecation]
 and its related [JIRA|http://example.com/] & 
[PR|https://github.com/apache/kafka/pull/2328]

This issue is discussed in [mailing 
list|https://lists.apache.org/thread.html/1fa014de0f57e31c877420b42df6d2fb9e2768492a9a2943d321c0e3@%3Cdev.samza.apache.org%3E].

  was:
 

Samza is requesting for an Kafka partition offset that is too old (i.e Kafka 
log has moved ahead). We are setting the property consumer.auto.offset.reset to 
smallest and therefore expecting that Samza will reset its checkpoint to 
earliest available partition offset in such a scenario. But that is not 
happening we are getting exceptions of this form continually:
{code:java}
INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
kafka.producer.SyncProducer:[Logging_class:info:66] - [main] -
Disconnecting from vrni-platform-release:9092
INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
system.kafka.GetOffset:[Logging_class:info:63] - [main] - Validating offset
56443499 for topic and partition Topic3-0
WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658]
system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - [main] - While
refreshing brokers for Topic3-0:
org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested
offset is not within the range of offsets maintained by the server..
Retrying
{code}
Browsing through the code, it appears that {{GetOffset::isValidOffset}} should 
be able to catch the exception {{OffsetOutOfRangeException}} and convert it to 
a false value. But it appears that this not happening. This appears due to the 
mismatch in package of the {{Exception}} 
[GetOffSet|https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala#L56]
 class is catching the exception {{import 
kafka.common.OffsetOutOfRangeException}}, but from logs, it appears that the 
package of this class is different.
{code:java}
def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: 
TopicAndPartition, offset: String) = {
    info("Validating offset %s for topic and partition %s" format (offset, 
topicAndPartition))

    try {
      val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))

      if (messages.hasError) {
        KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, 
topicAndPartition.partition).exception())
      }

      info("Able to successfully read from offset %s for topic and partition 
%s. Using it to instantiate consumer." format (offset, topicAndPartition))

      true
    } catch {
      case e: OffsetOutOfRangeException => false
    }
}
{code}
Also, it Appears that 
[BrokerProxy|https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L85]
 class - the caller of {{GetOffset}} would print a log {{It appears that...}} 
in case it gets a false value, but it is not logging this line (indicating that 
some {{Exception}} generated in {{GetOffset}} method is going uncaught and 
being propagated up):
{code:java}
def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
    debug("Adding new topic and partition %s to queue for %s" format (tp, host))

    if (nextOffsets.asJava.containsKey(tp)) {
      toss("Already consuming TopicPartition %s" format tp)
    }

    val offset = if (nextOffset.isDefined && 
offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
      nextOffset
        .get
        .toLong
    } else {
      warn("It appears that we received an invalid or empty offset %s for %s. 
Attempting to use Kafka's auto.offset.reset setting. This can result in data 
loss if processing continues." format (nextOffset, tp))

      offsetGetter.getResetOffset(simpleConsumer, tp)
    }

    debug("Got offset %s for new topic and partition %s." format (offset, tp))

    nextOffsets += tp -> offset

    metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
  }
{code}
The above is coming becuase of 
[KIP-35|https://cwiki.apache.org/confluence/display/KAFKA/KIP-109%3A+Old+Consumer+Deprecation]
 and its related [JIRA|http://example.com/] & 
[PR|https://github.com/apache/kafka/pull/2328]

This issue is discussed in [mailing 
list|https://lists.apache.org/thread.html/1fa014de0f57e31c877420b42df6d2fb9e2768492a9a2943d321c0e3@%3Cdev.samza.apache.org%3E].


> Samza 0.14.1 not correctly handling OffsetOutOfRangeException exception
> -----------------------------------------------------------------------
>
>                 Key: SAMZA-1822
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1822
>             Project: Samza
>          Issue Type: Bug
>    Affects Versions: 0.14.0, 0.14.1
>            Reporter: DM
>            Priority: Major
>
>  
> Samza is requesting for an Kafka partition offset that is too old (i.e Kafka 
> log has moved ahead). We are setting the property consumer.auto.offset.reset 
> to smallest and therefore expecting that Samza will reset its checkpoint to 
> earliest available partition offset in such a scenario. But that is not 
> happening we are getting exceptions of this form continually:
> {code:java}
> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
> kafka.producer.SyncProducer:[Logging_class:info:66] - [main] -
> Disconnecting from vrni-platform-release:9092
> INFO [2018-08-21 19:26:20,924] [U:669,F:454,T:1,123,M:2,658]
> system.kafka.GetOffset:[Logging_class:info:63] - [main] - Validating offset
> 56443499 for topic and partition Topic3-0
> WARN [2018-08-21 19:26:20,925] [U:669,F:454,T:1,123,M:2,658]
> system.kafka.KafkaSystemConsumer:[Logging_class:warn:74] - [main] - While
> refreshing brokers for Topic3-0:
> org.apache.kafka.common.errors.OffsetOutOfRangeException: The requested
> offset is not within the range of offsets maintained by the server..
> Retrying
> {code}
> Browsing through the code, it appears that {{GetOffset::isValidOffset}} 
> should be able to catch the exception {{OffsetOutOfRangeException}} and 
> convert it to a false value. But it appears that this not happening. This 
> appears due to the mismatch in package of the {{Exception}} 
> [GetOffSet|https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/GetOffset.scala#L56]
>  class is catching the exception {{import 
> kafka.common.OffsetOutOfRangeException}}, but from logs, it appears that the 
> package of this class is different.
> {code:java}
> def isValidOffset(consumer: DefaultFetchSimpleConsumer, topicAndPartition: 
> TopicAndPartition, offset: String) = {
>     info("Validating offset %s for topic and partition %s" format (offset, 
> topicAndPartition))
>     try {
>       val messages = consumer.defaultFetch((topicAndPartition, offset.toLong))
>       if (messages.hasError) {
>         KafkaUtil.maybeThrowException(messages.error(topicAndPartition.topic, 
> topicAndPartition.partition).exception())
>       }
>       info("Able to successfully read from offset %s for topic and partition 
> %s. Using it to instantiate consumer." format (offset, topicAndPartition))
>       true
>     } catch {
>       case e: OffsetOutOfRangeException => false
>     }
> }
> {code}
> Also, it Appears that 
> [BrokerProxy|https://github.com/apache/samza/blob/0.14.1/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L85]
>  class - the caller of {{GetOffset}} would print a log {{It appears that...}} 
> in case it gets a false value, but it is not logging this line (indicating 
> that some {{Exception}} generated in {{GetOffset}} method is going uncaught 
> and being propagated up):
> {code:java}
> def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
>     debug("Adding new topic and partition %s to queue for %s" format (tp, 
> host))
>     if (nextOffsets.asJava.containsKey(tp)) {
>       toss("Already consuming TopicPartition %s" format tp)
>     }
>     val offset = if (nextOffset.isDefined && 
> offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
>       nextOffset
>         .get
>         .toLong
>     } else {
>       warn("It appears that we received an invalid or empty offset %s for %s. 
> Attempting to use Kafka's auto.offset.reset setting. This can result in data 
> loss if processing continues." format (nextOffset, tp))
>       offsetGetter.getResetOffset(simpleConsumer, tp)
>     }
>     debug("Got offset %s for new topic and partition %s." format (offset, tp))
>     nextOffsets += tp -> offset
>     metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
>   }
> {code}
> The above is coming becuase of 
> [KIP-109|https://cwiki.apache.org/confluence/display/KAFKA/KIP-109%3A+Old+Consumer+Deprecation]
>  and its related [JIRA|http://example.com/] & 
> [PR|https://github.com/apache/kafka/pull/2328]
> This issue is discussed in [mailing 
> list|https://lists.apache.org/thread.html/1fa014de0f57e31c877420b42df6d2fb9e2768492a9a2943d321c0e3@%3Cdev.samza.apache.org%3E].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to