[ 
https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Puviarasu updated SPARK-31754:
------------------------------
    Description: 
When joining 2 streams with watermarking and windowing we are getting 
NullPointer Exception after running for few minutes. 

After failure we analyzed the checkpoint offsets/sources and found the files 
for which the application failed. These files are not having any null values in 
the join columns. 

We even started the job with the files and the application ran. From this we 
concluded that the exception is not because of the data from the streams.

*Code:*

 
{code:java}
val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", 
"maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" 
->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> 
"1" )
 val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", 
"maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" 
->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> 
"1" )
 
spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1")
 
spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2")
 spark.sql("select * from source1 where eventTime1 is not null and col1 is not 
null").withWatermark("eventTime1", "30 minutes").createTempView("viewNotNull1")
 spark.sql("select * from source2 where eventTime2 is not null and col2 is not 
null").withWatermark("eventTime2", "30 minutes").createTempView("viewNotNull2")
 spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = b.col2 
and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + interval 2 
hours").createTempView("join")
 val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> 
"/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3")
 spark.sql("select * from 
join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 
seconds")).format("parquet").options(optionsMap3).start()
{code}
 

*Exception:*

 
{code:java}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
Aborting TaskSet 4.0 because task 0 (partition 0)
cannot run anywhere due to node and executor blacklist.
Most recent failure:
Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197)
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221)
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream)
        at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:583)
        at 
org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:108)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.timeTakenMs(StreamingSymmetricHashJoinExec.scala:126)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1(StreamingSymmetricHashJ
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$processPartitions$1.apply$mcV$sp(St:361)
        at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
        at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
        at 
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at 
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Blacklisting behavior can be configured via spark.blacklist.*.        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
        ... 19 moreException in thread "main" org.apache.spark.SparkException: 
Application application_2345 finished with failed status
        at org.apache.spark.deploy.yarn.Client.run(Client.scala:1158)
        at 
org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1606)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

{code}
 

  was:
When joining 2 streams with watermarking and windowing we are getting 
NullPointer Exception after running for few minutes. 

After failure we analyzed the checkpoint offsets/sources and found the files 
for which the application failed. These files are not having any null values in 
the join columns. 

We even started the job with the files and the application ran. From this we 
concluded that the exception is not because of the data from the streams.

*Code:*

 
{code:java}

{code}
*val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", 
"maxFilesPerTrigger" ->  "1", "latestFirst" -> "false", "fileNameOnly" 
->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> 
"1" )
val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", 
"maxFilesPerTrigger" ->  "1", "latestFirst" -> "false", "fileNameOnly" 
->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> 
"1" )
spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1")
spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2")
spark.sql("select * from source1 where eventTime1 is not null and col1 is not 
null").withWatermark("eventTime1", "30 minutes").createTempView("viewNotNull1")
spark.sql("select * from source2 where eventTime2 is not null and col2 is not 
null").withWatermark("eventTime2", "30 minutes").createTempView("viewNotNull2")
spark.sql("select * from viewNotNull1 a join  viewNotNull2 b on a.col1 = b.col2 
and a.eventTime1 >= b.eventTime2 and  a.eventTime1 <= b.eventTime2 + interval 2 
hours").createTempView("join")
val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> 
"/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3")
spark.sql("select * from 
join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 
seconds")).format("parquet").options(optionsMap3).start()*    

 

*Exception:*

 
{code:java}
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
Aborting TaskSet 4.0 because task 0 (partition 0)
cannot run anywhere due to node and executor blacklist.
Most recent failure:
Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
 Source)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197)
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221)
        at 
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream)
        at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:583)
        at 
org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:108)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.timeTakenMs(StreamingSymmetricHashJoinExec.scala:126)
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1(StreamingSymmetricHashJ
        at 
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$processPartitions$1.apply$mcV$sp(St:361)
        at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
        at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
        at 
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
        at 
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Blacklisting behavior can be configured via spark.blacklist.*.        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
        ... 19 moreException in thread "main" org.apache.spark.SparkException: 
Application application_2345 finished with failed status
        at org.apache.spark.deploy.yarn.Client.run(Client.scala:1158)
        at 
org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1606)
        at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
        at 
org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

{code}
 


> Spark Structured Streaming: NullPointerException in Stream Stream join
> ----------------------------------------------------------------------
>
>                 Key: SPARK-31754
>                 URL: https://issues.apache.org/jira/browse/SPARK-31754
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>         Environment: Spark Version : 2.4.0
> Hadoop Version : 3.0.0
>            Reporter: Puviarasu
>            Priority: Blocker
>              Labels: structured-streaming
>
> When joining 2 streams with watermarking and windowing we are getting 
> NullPointer Exception after running for few minutes. 
> After failure we analyzed the checkpoint offsets/sources and found the files 
> for which the application failed. These files are not having any null values 
> in the join columns. 
> We even started the job with the files and the application ran. From this we 
> concluded that the exception is not because of the data from the streams.
> *Code:*
>  
> {code:java}
> val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", 
> "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" 
> ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" -> 
> "1" )
>  val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2", 
> "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly" 
> ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" -> 
> "1" )
>  
> spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1")
>  
> spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2")
>  spark.sql("select * from source1 where eventTime1 is not null and col1 is 
> not null").withWatermark("eventTime1", "30 
> minutes").createTempView("viewNotNull1")
>  spark.sql("select * from source2 where eventTime2 is not null and col2 is 
> not null").withWatermark("eventTime2", "30 
> minutes").createTempView("viewNotNull2")
>  spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 = 
> b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 + 
> interval 2 hours").createTempView("join")
>  val optionsMap3 = Map[String, String]("compression" -> "snappy","path" -> 
> "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3")
>  spark.sql("select * from 
> join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5 
> seconds")).format("parquet").options(optionsMap3).start()
> {code}
>  
> *Exception:*
>  
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
> Aborting TaskSet 4.0 because task 0 (partition 0)
> cannot run anywhere due to node and executor blacklist.
> Most recent failure:
> Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
>  Source)
>         at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
>         at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
>         at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197)
>         at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221)
>         at 
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157)
>         at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
>         at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
>         at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338)
>         at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream)
>         at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream)
>         at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:583)
>         at 
> org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:108)
>         at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.timeTakenMs(StreamingSymmetricHashJoinExec.scala:126)
>         at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1(StreamingSymmetricHashJ
>         at 
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$processPartitions$1.apply$mcV$sp(St:361)
>         at 
> org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
>         at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
>         at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
>  Source)
>         at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>         at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
>         at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>         at 
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
>         at 
> org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
>         at 
> org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
>         at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
>         at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
>         at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>         at org.apache.spark.scheduler.Task.run(Task.scala:121)
>         at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
>         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Blacklisting behavior can be configured via spark.blacklist.*.        at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
>         at scala.Option.foreach(Option.scala:257)
>         at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
>         at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
>         at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
>         ... 19 moreException in thread "main" 
> org.apache.spark.SparkException: Application application_2345 finished with 
> failed status
>         at org.apache.spark.deploy.yarn.Client.run(Client.scala:1158)
>         at 
> org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1606)
>         at 
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
>         at 
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
>         at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
>         at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
>         at 
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to