Yang Guo created SPARK-49123:
--------------------------------
Summary: Improve the logging behavior when Spark loaded zero
result from Kafka after poll timeout
Key: SPARK-49123
URL: https://issues.apache.org/jira/browse/SPARK-49123
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 3.3.0
Environment: java version: openjdk version "1.8.0_402"
python version: 3.10.2
os: amazon linux 2
Reporter: Yang Guo
When using spark structured streaming to read data from Kafka, the poll request
can time out if the producer is writing data to the topic in transaction mode
but not commit the transaction for a long time.
When this happened, the `r.isEmpty` as `True` but the offsetAfterPoll changed.
This will not thrown any error or warning logs. This makes users have no idea
why the related spark task delay for `pollTimeoutMs`. Here is the related
[code|https://github.com/apache/spark/blob/v3.3.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala#L85-L101]:
{code:java}
if (r.isEmpty) {
// We cannot fetch anything after `poll`. Two possible cases:
// - `offset` is out of range so that Kafka returns nothing.
`OffsetOutOfRangeException` will
// be thrown.
// - Cannot fetch any data before timeout. `TimeoutException` will be
thrown.
// - Fetched something but all of them are not invisible. This is a valid
case and let the
// caller handles this.
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
} else if (offset == offsetAfterPoll) {
throw new TimeoutException(
s"Cannot fetch record for offset $offset in $pollTimeoutMs
milliseconds")
}
}{code}
To improve the user experience for troubleshooting when the poll get zero
result before timeout but offsetAfterPoll changed, there should add some
warning or error level logs:
A sample looks like the following:
{code:java}
if (r.isEmpty) {
// We cannot fetch anything after `poll`. Two possible cases:
// - `offset` is out of range so that Kafka returns nothing.
`OffsetOutOfRangeException` will
// be thrown.
// - Cannot fetch any data before timeout. `TimeoutException` will be
thrown.
// - Fetched something but all of them are invisible. This is a valid
case and let the
// caller handles this. Although this is a valid case, it is necessary
to add warning level logs to indicate zero data returned before timeout.
if (offset < range.earliest || offset >= range.latest) {
throw new OffsetOutOfRangeException(
Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
} else if (offset == offsetAfterPoll) {
throw new TimeoutException(
s"Cannot fetch record for offset $offset in $pollTimeoutMs
milliseconds")
} else {
logWarning(s"After taking $pollTimeoutMs milliseconds polling, ZERO
record fetched while offset changed from $offset to $offsetAfterPoll.")
}
} {code}
By the way, fix the typo from `Fetched something but all of them are not
invisible` to `Fetched something but all of them are invisible`.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]