Yang Guo created SPARK-49123:
--------------------------------

             Summary: Improve the logging behavior when Spark loaded zero 
result from Kafka after poll timeout
                 Key: SPARK-49123
                 URL: https://issues.apache.org/jira/browse/SPARK-49123
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.3.0
         Environment: java version: openjdk version "1.8.0_402"

python version: 3.10.2

os: amazon linux 2

 

 
            Reporter: Yang Guo


When using spark structured streaming to read data from Kafka, the poll request 
can time out if the producer is writing data to the topic in transaction mode 
but not commit the transaction for a long time.

 

When this happened, the `r.isEmpty` as `True` but the offsetAfterPoll changed. 
This will not thrown any error or warning logs. This makes users have no idea 
why the related spark task delay for `pollTimeoutMs`. Here is the related 
[code|https://github.com/apache/spark/blob/v3.3.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala#L85-L101]:

 

 
{code:java}
    if (r.isEmpty) {
      // We cannot fetch anything after `poll`. Two possible cases:
      // - `offset` is out of range so that Kafka returns nothing. 
`OffsetOutOfRangeException` will
      //   be thrown.
      // - Cannot fetch any data before timeout. `TimeoutException` will be 
thrown.
      // - Fetched something but all of them are not invisible. This is a valid 
case and let the
      //   caller handles this.
      if (offset < range.earliest || offset >= range.latest) {
        throw new OffsetOutOfRangeException(
          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
      } else if (offset == offsetAfterPoll) {
        throw new TimeoutException(
          s"Cannot fetch record for offset $offset in $pollTimeoutMs 
milliseconds")
      }
    }{code}
 

 

To improve the user experience for troubleshooting when the poll get zero 
result before timeout but offsetAfterPoll changed, there should add some 
warning or error level logs:

 

A sample looks like the following:

 
{code:java}
    if (r.isEmpty) {
      // We cannot fetch anything after `poll`. Two possible cases:
      // - `offset` is out of range so that Kafka returns nothing. 
`OffsetOutOfRangeException` will
      //   be thrown.
      // - Cannot fetch any data before timeout. `TimeoutException` will be 
thrown.
      // - Fetched something but all of them are invisible. This is a valid 
case and let the
      //   caller handles this. Although this is a valid case, it is necessary 
to add warning level logs to indicate zero data returned before timeout.
      if (offset < range.earliest || offset >= range.latest) {
        throw new OffsetOutOfRangeException(
          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
      } else if (offset == offsetAfterPoll) {
        throw new TimeoutException(
          s"Cannot fetch record for offset $offset in $pollTimeoutMs 
milliseconds")
      } else {
        logWarning(s"After taking $pollTimeoutMs milliseconds polling, ZERO 
record fetched while offset changed from $offset to $offsetAfterPoll.")
      }
} {code}
 

 

By the way, fix the typo from `Fetched something but all of them are not 
invisible` to `Fetched something but all of them are invisible`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to