[ 
https://issues.apache.org/jira/browse/SPARK-57438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Thomas Newton updated SPARK-57438:
----------------------------------
    Description: 
When using lots of Kafka spark structured streaming sources, they occasionally 
crash with the following error:
```

java.lang.NullPointerException: Cannot invoke 
"scala.collection.IterableOps.map(scala.Function1)" because the return value of 
"scala.Option.get()" is null
        at 
org.apache.spark.sql.kafka010.KafkaMicroBatchStream$.metrics(KafkaMicroBatchStream.scala:520)
 ~[kafka_extension_deploy.jar:?]
        at 
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.metrics(KafkaMicroBatchStream.scala:363)
 ~[kafka_extension_deploy.jar:?]
        at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$2(ProgressReporter.scala:384)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:496) 
~[spark-core_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$1(ProgressReporter.scala:380)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at scala.collection.immutable.List.map(List.scala:236) 
~[scala-library-2.13.17.jar:?]
        at scala.collection.immutable.List.map(List.scala:79) 
~[scala-library-2.13.17.jar:?]
        at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.extractSourceProgress(ProgressReporter.scala:379)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.constructNewProgress(ProgressReporter.scala:348)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:312)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:429)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:525)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:461)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:461)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch(TriggerExecutor.scala:40)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:38)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.runOneBatch(TriggerExecutor.scala:60)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.execute(TriggerExecutor.scala:65)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:461)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:347)
 [spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) 
~[scala-library-2.13.17.jar:?]
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) 
~[spark-sql-api_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:307)
 [spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution$$anon$1.run(StreamExecution.scala:230)
 [spark-sql_2.13-4.1.1.jar:4.1.1]
```

 

I believe the problem was introduced by 
[https://github.com/apache/spark/pull/52729]
 
The attached patch file seems to fix it

  was:
When using lots of Kafka spark structured streaming sources, they occasionally 
crash with the following error:
```

java.lang.NullPointerException: Cannot invoke 
"scala.collection.IterableOps.map(scala.Function1)" because the return value of 
"scala.Option.get()" is null
        at 
org.apache.spark.sql.kafka010.KafkaMicroBatchStream$.metrics(KafkaMicroBatchStream.scala:520)
 ~[kafka_extension_deploy.jar:?]
        at 
org.apache.spark.sql.kafka010.KafkaMicroBatchStream.metrics(KafkaMicroBatchStream.scala:363)
 ~[kafka_extension_deploy.jar:?]
        at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$2(ProgressReporter.scala:384)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:496) 
~[spark-core_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$1(ProgressReporter.scala:380)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at scala.collection.immutable.List.map(List.scala:236) 
~[scala-library-2.13.17.jar:?]
        at scala.collection.immutable.List.map(List.scala:79) 
~[scala-library-2.13.17.jar:?]
        at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.extractSourceProgress(ProgressReporter.scala:379)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.constructNewProgress(ProgressReporter.scala:348)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:312)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:429)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:525)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:461)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:461)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch(TriggerExecutor.scala:40)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:38)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.runOneBatch(TriggerExecutor.scala:60)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.execute(TriggerExecutor.scala:65)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:461)
 ~[spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:347)
 [spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) 
~[scala-library-2.13.17.jar:?]
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) 
~[spark-sql-api_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:307)
 [spark-sql_2.13-4.1.1.jar:4.1.1]
        at 
org.apache.spark.sql.execution.streaming.runtime.StreamExecution$$anon$1.run(StreamExecution.scala:230)
 [spark-sql_2.13-4.1.1.jar:4.1.1]
```

 

I believe the problem was introduced by 
[https://github.com/apache/spark/pull/52729]
 


> NullPointerException in Kafka structured streaming source
> ---------------------------------------------------------
>
>                 Key: SPARK-57438
>                 URL: https://issues.apache.org/jira/browse/SPARK-57438
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 4.1.1, 4.1.2
>         Environment: Ubuntu 24.04
> Kubernetes
> PySpark 4.1.1
> spark-sql-kafka-0-10_2.13:4.1.1
>  
>            Reporter: Thomas Newton
>            Priority: Major
>         Attachments: kafka_streaming_null_pointer_fix.patch
>
>
> When using lots of Kafka spark structured streaming sources, they 
> occasionally crash with the following error:
> ```
> java.lang.NullPointerException: Cannot invoke 
> "scala.collection.IterableOps.map(scala.Function1)" because the return value 
> of "scala.Option.get()" is null
>         at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream$.metrics(KafkaMicroBatchStream.scala:520)
>  ~[kafka_extension_deploy.jar:?]
>         at 
> org.apache.spark.sql.kafka010.KafkaMicroBatchStream.metrics(KafkaMicroBatchStream.scala:363)
>  ~[kafka_extension_deploy.jar:?]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$2(ProgressReporter.scala:384)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:496) 
> ~[spark-core_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.$anonfun$extractSourceProgress$1(ProgressReporter.scala:380)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at scala.collection.immutable.List.map(List.scala:236) 
> ~[scala-library-2.13.17.jar:?]
>         at scala.collection.immutable.List.map(List.scala:79) 
> ~[scala-library-2.13.17.jar:?]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.extractSourceProgress(ProgressReporter.scala:379)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.constructNewProgress(ProgressReporter.scala:348)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:312)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.ProgressContext.finishTrigger(ProgressReporter.scala:429)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:525)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:461)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:461)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch(TriggerExecutor.scala:40)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:38)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.runOneBatch(TriggerExecutor.scala:60)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.MultiBatchExecutor.execute(TriggerExecutor.scala:65)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:461)
>  ~[spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:347)
>  [spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) 
> ~[scala-library-2.13.17.jar:?]
>         at 
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804) 
> ~[spark-sql-api_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution.org$apache$spark$sql$execution$streaming$runtime$StreamExecution$$runStream(StreamExecution.scala:307)
>  [spark-sql_2.13-4.1.1.jar:4.1.1]
>         at 
> org.apache.spark.sql.execution.streaming.runtime.StreamExecution$$anon$1.run(StreamExecution.scala:230)
>  [spark-sql_2.13-4.1.1.jar:4.1.1]
> ```
>  
> I believe the problem was introduced by 
> [https://github.com/apache/spark/pull/52729]
>  
> The attached patch file seems to fix it



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to