cxb created SPARK-39132:
---------------------------

             Summary: spark3.2.1 cache throw NPE
                 Key: SPARK-39132
                 URL: https://issues.apache.org/jira/browse/SPARK-39132
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 3.2.1
         Environment: i set it a driver and 2 executors executor allocate 2g 
memory and old generation usage rate about 50%, i think it is health
            Reporter: cxb


a job running some time about 1 day will throw the exception when i upgrade 
spark version to 3.2.1

gc log: 
{code:java}
Heap
 par new generation   total 307840K, used 239453K [0x0000000080000000, 
0x0000000094e00000, 0x00000000aaaa0000)
  eden space 273664K,  81% used [0x0000000080000000, 0x000000008da4bdd0, 
0x0000000090b40000)
  from space 34176K,  46% used [0x0000000092ca0000, 0x0000000093c2b6b8, 
0x0000000094e00000)
  to   space 34176K,   0% used [0x0000000090b40000, 0x0000000090b40000, 
0x0000000092ca0000)
 concurrent mark-sweep generation total 811300K, used 451940K 
[0x00000000aaaa0000, 0x00000000dc2e9000, 0x0000000100000000)
 Metaspace       used 102593K, capacity 110232K, committed 121000K, reserved 
1155072K
  class space    used 12473K, capacity 13482K, committed 15584K, reserved 
1048576K {code}
code:

{{}}{{}}

 
{code:java}
sparkSession
.readStream
.format('kafka')
.load
.repartition(4)
...project
.watermark
.groupby(k1, k2)
.agg(size(collect_set('xxx')))
.writeStream 
.foreachBatch(function test)
.start

def test:(Dataset[Row], Long) => Unit = (ds: Dataset[Row], _: Long) => {
      ds.persist(StorageLevel.MEMORY_AND_DISK_SER)
      ds.write
        .option("collection", s"col_1")
        .option("maxBatchSize", "2048")
        .mode("append")
        .mongo()
      ds..write
        .option("collection", s"col_2")
        .option("maxBatchSize", "2048")
        .mode("append")
        .mongo()
      ds.unpersist()
}{code}
 

 

exception log

 
{code:java}

{code}
22/05/09 21:11:28 ERROR streaming.MicroBatchExecution: Query rydts_regist_gp 
[id = 669c2031-71b2-422b-859d-336722d289e9, runId = 
049de32c-e6ff-48f1-8742-bb95122a36ea] terminated with error
java.lang.NullPointerException
    at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder.$anonfun$isCachedRDDLoaded$1(InMemoryRelation.scala:248)
    at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder.$anonfun$isCachedRDDLoaded$1$adapted(InMemoryRelation.scala:247)
    at 
scala.collection.IndexedSeqOptimized.prefixLengthImpl(IndexedSeqOptimized.scala:41)
    at scala.collection.IndexedSeqOptimized.forall(IndexedSeqOptimized.scala:46)
    at 
scala.collection.IndexedSeqOptimized.forall$(IndexedSeqOptimized.scala:46)
    at scala.collection.mutable.ArrayOps$ofRef.forall(ArrayOps.scala:198)
    at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedRDDLoaded(InMemoryRelation.scala:247)
    at 
org.apache.spark.sql.execution.columnar.CachedRDDBuilder.isCachedColumnBuffersLoaded(InMemoryRelation.scala:241)
    at 
org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8(CacheManager.scala:189)
    at 
org.apache.spark.sql.execution.CacheManager.$anonfun$uncacheQuery$8$adapted(CacheManager.scala:176)
    at 
scala.collection.TraversableLike.$anonfun$filterImpl$1(TraversableLike.scala:304)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.filterImpl(TraversableLike.scala:303)
    at scala.collection.TraversableLike.filterImpl$(TraversableLike.scala:297)
    at scala.collection.AbstractTraversable.filterImpl(Traversable.scala:108)
    at scala.collection.TraversableLike.filter(TraversableLike.scala:395)
    at scala.collection.TraversableLike.filter$(TraversableLike.scala:395)
    at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
    at 
org.apache.spark.sql.execution.CacheManager.recacheByCondition(CacheManager.scala:219)
    at 
org.apache.spark.sql.execution.CacheManager.uncacheQuery(CacheManager.scala:176)
    at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3220)
    at org.apache.spark.sql.Dataset.unpersist(Dataset.scala:3231)
    at common.job.xxx$.$anonfun$main$3(xxx.scala:117)
    at common.job.xxx$.$anonfun$main$3$adapted(xxx.scala:103)
    at 
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
    at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:600)
    at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:598)
    at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
    at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:598)
    at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:228)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:375)
    at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:373)
    at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
    at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:193)
    at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:187)
    at 
org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:303)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
    at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:209)
 

 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to