[
https://issues.apache.org/jira/browse/SPARK-57438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18089242#comment-18089242
]
Anupam Yadav commented on SPARK-57438:
--------------------------------------
Opened https://github.com/apache/spark/pull/56526 with a fix based on your
diagnosis and proposed patch. The PR changes the non-RTM branch to
Option(latestPartitionOffsets) (your fix) and additionally hardens the
companion metrics() so the public method cannot NPE on Some(null). Credited you
in the PR description for the report, root-cause analysis, and patch. If you
would like to be added as a Co-authored-by on the commit, let me know the
name/email you would like used. Thanks for the clear write-up!
> NullPointerException in Kafka structured streaming source
> ---------------------------------------------------------
>
> Key: SPARK-57438
> URL: https://issues.apache.org/jira/browse/SPARK-57438
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 4.1.1, 4.1.2
> Environment: Ubuntu 24.04
> Kubernetes
> PySpark 4.1.1
> spark-sql-kafka-0-10_2.13:4.1.1
>
> Reporter: Thomas Newton
> Priority: Major
> Labels: pull-request-available
> Attachments: kafka_streaming_null_pointer_fix.patch
>
>
> When using lots of Kafka spark structured streaming sources, they
> occasionally crash with the following error:
> ```
> java.lang.NullPointerException: Cannot invoke
> "scala.collection.IterableOps.map(scala.Function1)" because the return value
> of "scala.Option.get()" is null
> at
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream$.metrics(KafkaMicroBatchStream.scala:520)
> ~[kafka_extension_deploy.jar:?]
> at
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.metrics(KafkaMicroBatchStream.scala:363)
> ~[kafka_extension_deploy.jar:?]
> at
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$2(ProgressReporter.scala:384)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:496)
> ~[spark-core_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$1(ProgressReporter.scala:380)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at scala.collection.immutable.List.map(List.scala:236)
> ~[scala-library-2.13.17.jar:?]
> at scala.collection.immutable.List.map(List.scala:79)
> ~[scala-library-2.13.17.jar:?]
> at
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.extractSourceProgress(ProgressReporter.scala:379)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.constructNewProgress(ProgressReporter.scala:348)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:312)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:429)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:525)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:461)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:461)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch(TriggerExecutor.scala:40)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:38)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.runOneBatch(TriggerExecutor.scala:60)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.execute(TriggerExecutor.scala:65)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:461)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:347)
> [spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
> ~[scala-library-2.13.17.jar:?]
> at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
> ~[spark-sql-api_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:307)
> [spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution$$anon$1.run(StreamExecution.scala:230)
> [spark-sql_2.13-4.1.1.jar:4.1.1]
> ```
>
> I believe the problem was introduced by
> [https://github.com/apache/spark/pull/52729]
>
> The attached patch file seems to fix it. Without that patch I think its
> possible for [this
> line|https://github.com/apache/spark/blob/c0690c763bafabd08e7079d1137fa0a769a05bae/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala#L360]
> to create `Some(null)`, which later causes the error.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]