[
https://issues.apache.org/jira/browse/SPARK-57438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18088961#comment-18088961
]
Anupam Yadav commented on SPARK-57438:
--------------------------------------
I'm looking into this and working on a fix.
> NullPointerException in Kafka structured streaming source
> ---------------------------------------------------------
>
> Key: SPARK-57438
> URL: https://issues.apache.org/jira/browse/SPARK-57438
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 4.1.1, 4.1.2
> Environment: Ubuntu 24.04
> Kubernetes
> PySpark 4.1.1
> spark-sql-kafka-0-10_2.13:4.1.1
>
> Reporter: Thomas Newton
> Priority: Major
> Attachments: kafka_streaming_null_pointer_fix.patch
>
>
> When using lots of Kafka spark structured streaming sources, they
> occasionally crash with the following error:
> ```
> java.lang.NullPointerException: Cannot invoke
> "scala.collection.IterableOps.map(scala.Function1)" because the return value
> of "scala.Option.get()" is null
> at
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream$.metrics(KafkaMicroBatchStream.scala:520)
> ~[kafka_extension_deploy.jar:?]
> at
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.metrics(KafkaMicroBatchStream.scala:363)
> ~[kafka_extension_deploy.jar:?]
> at
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$2(ProgressReporter.scala:384)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:496)
> ~[spark-core_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$1(ProgressReporter.scala:380)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at scala.collection.immutable.List.map(List.scala:236)
> ~[scala-library-2.13.17.jar:?]
> at scala.collection.immutable.List.map(List.scala:79)
> ~[scala-library-2.13.17.jar:?]
> at
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.extractSourceProgress(ProgressReporter.scala:379)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.constructNewProgress(ProgressReporter.scala:348)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:312)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:429)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:525)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:461)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:461)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch(TriggerExecutor.scala:40)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:38)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.runOneBatch(TriggerExecutor.scala:60)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.execute(TriggerExecutor.scala:65)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:461)
> ~[spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:347)
> [spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
> ~[scala-library-2.13.17.jar:?]
> at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
> ~[spark-sql-api_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:307)
> [spark-sql_2.13-4.1.1.jar:4.1.1]
> at
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution$$anon$1.run(StreamExecution.scala:230)
> [spark-sql_2.13-4.1.1.jar:4.1.1]
> ```
>
> I believe the problem was introduced by
> [https://github.com/apache/spark/pull/52729]
>
> The attached patch file seems to fix it
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]