[ 
https://issues.apache.org/jira/browse/SPARK-57438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18089242#comment-18089242
 ] 

Anupam Yadav commented on SPARK-57438:
--------------------------------------

Opened https://github.com/apache/spark/pull/56526 with a fix based on your 
diagnosis and proposed patch. The PR changes the non-RTM branch to 
Option(latestPartitionOffsets) (your fix) and additionally hardens the 
companion metrics() so the public method cannot NPE on Some(null). Credited you 
in the PR description for the report, root-cause analysis, and patch. If you 
would like to be added as a Co-authored-by on the commit, let me know the 
name/email you would like used. Thanks for the clear write-up!

> NullPointerException in Kafka structured streaming source
> ---------------------------------------------------------
>
>                 Key: SPARK-57438
>                 URL: https://issues.apache.org/jira/browse/SPARK-57438
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 4.1.1, 4.1.2
>         Environment: Ubuntu 24.04
> Kubernetes
> PySpark 4.1.1
> spark-sql-kafka-0-10_2.13:4.1.1
>  
>            Reporter: Thomas Newton
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: kafka_streaming_null_pointer_fix.patch
>
>
> When using lots of Kafka spark structured streaming sources, they 
> occasionally crash with the following error:
> ```
> java.lang.NullPointerException: Cannot invoke 
> "scala.collection.IterableOps.map(scala.Function1)" because the return value 
> of "scala.Option.get()" is null
>         at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream$.metrics(KafkaMicroBatchStream.scala:520)
>  ~[kafka_extension_deploy.jar:?]
>         at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.metrics(KafkaMicroBatchStream.scala:363)
>  ~[kafka_extension_deploy.jar:?]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$2(ProgressReporter.scala:384)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:496) 
> ~[spark-core_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$1(ProgressReporter.scala:380)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at scala.collection.immutable.List.map(List.scala:236) 
> ~[scala-library-2.13.17.jar:?]
>         at scala.collection.immutable.List.map(List.scala:79) 
> ~[scala-library-2.13.17.jar:?]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.extractSourceProgress(ProgressReporter.scala:379)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.constructNewProgress(ProgressReporter.scala:348)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:312)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:429)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:525)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:461)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:461)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch(TriggerExecutor.scala:40)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:38)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.runOneBatch(TriggerExecutor.scala:60)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.execute(TriggerExecutor.scala:65)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:461)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:347)
>  [spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) 
> ~[scala-library-2.13.17.jar:?]
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) 
> ~[spark-sql-api_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:307)
>  [spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution$$anon$1.run(StreamExecution.scala:230)
>  [spark-sql_2.13-4.1.1.jar:4.1.1]
> ```
>  
> I believe the problem was introduced by 
> [https://github.com/apache/spark/pull/52729]
>  
> The attached patch file seems to fix it. Without that patch I think its 
> possible for [this 
> line|https://github.com/apache/spark/blob/c0690c763bafabd08e7079d1137fa0a769a05bae/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala#L360]
>  to create `Some(null)`, which later causes the error. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to