[
https://issues.apache.org/jira/browse/SPARK-31754?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17544670#comment-17544670
]
Ankit Verma commented on SPARK-31754:
-------------------------------------
While analysing one of the Production issue which we recently had, came across
this bug which is similar to the issue with one of our spark streaming jobs
(i.e. one of the query does a streaming join on data coming from two different
kafka topics)
Spark Version - 3.1.1
*Stack-trace :*
{code:java}
py4j.protocol.Py4JJavaError: An error occurred while calling o2504.save. :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 19 in
stage 28.0 failed 4 times, most recent failure: Lost task 19.3 in stage 28.0
(TID 653) (ip-10-98-69-15.ap-southeast-2.compute.internal executor 6):
java.lang.NullPointerException at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:236)
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:260)
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:196)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$21(StreamingSymmetricHashJoinExec.scala:408)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at
org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:597) at
org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:111)
at
org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:111)
at
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.timeTakenMs(StreamingSymmetricHashJoinExec.scala:127)
at
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.onOutputCompletion$1(StreamingSymmetricHashJoinExec.scala:385)
at
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$25(StreamingSymmetricHashJoinExec.scala:431)
at
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:51)
at
org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$2(KafkaWriter.scala:72)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at
org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$1(KafkaWriter.scala:73)
at
org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$1$adapted(KafkaWriter.scala:70)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1020) at
org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1020) at
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2278) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
org.apache.spark.scheduler.Task.run(Task.scala:131) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750) Driver stacktrace: at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413) at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
at scala.Option.foreach(Option.scala:407) at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2238) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2259) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2278) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:2303) at
org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:1020) at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at
org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:1018) at
org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:70) at
org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:180)
at
org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181) at
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
at
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
at
org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at
org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at
py4j.Gateway.invoke(Gateway.java:282) at
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at
py4j.commands.CallCommand.execute(CallCommand.java:79) at
py4j.GatewayConnection.run(GatewayConnection.java:238) at
java.lang.Thread.run(Thread.java:750) Caused by: java.lang.NullPointerException
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:236)
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:260)
at
org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:196)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$21(StreamingSymmetricHashJoinExec.scala:408)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at
org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:597) at
org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:111)
at
org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:111)
at
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.timeTakenMs(StreamingSymmetricHashJoinExec.scala:127)
at
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.onOutputCompletion$1(StreamingSymmetricHashJoinExec.scala:385)
at
org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$processPartitions$25(StreamingSymmetricHashJoinExec.scala:431)
at
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:47)
at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:36)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458) at
org.apache.spark.sql.kafka010.KafkaWriteTask.execute(KafkaWriteTask.scala:51)
at
org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$2(KafkaWriter.scala:72)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at
org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at
org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$1(KafkaWriter.scala:73)
at
org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$1$adapted(KafkaWriter.scala:70)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1020) at
org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1020) at
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2278) at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at
org.apache.spark.scheduler.Task.run(Task.scala:131) at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more at py4j.Protocol.getReturnValue(Protocol.java:476) at
py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108) at
com.sun.proxy.$Proxy61.call(Unknown Source) at
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:55)
at
org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:55)
at
org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
at
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
at
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
at
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
{code}
With this exception the query got finished and went into completed state, we
then tried redeploying the job without any change (with same checkpoints) it
again failed with similar exception above.
We then cleared the checkpoints specific to this query and redeployed the job
again, this time it worked fine without throwing any exception, and is
currently running fine in Production.
Any update here would be appreciated.
> Spark Structured Streaming: NullPointerException in Stream Stream join
> ----------------------------------------------------------------------
>
> Key: SPARK-31754
> URL: https://issues.apache.org/jira/browse/SPARK-31754
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.4.0
> Environment: Spark Version : 2.4.0
> Hadoop Version : 3.0.0
> Reporter: Puviarasu
> Priority: Major
> Labels: structured-streaming
> Attachments: CodeGen.txt, Excpetion-3.0.0Preview2.txt,
> Logical-Plan.txt
>
>
> When joining 2 streams with watermarking and windowing we are getting
> NullPointer Exception after running for few minutes.
> After failure we analyzed the checkpoint offsets/sources and found the files
> for which the application failed. These files are not having any null values
> in the join columns.
> We even started the job with the files and the application ran. From this we
> concluded that the exception is not because of the data from the streams.
> *Code:*
>
> {code:java}
> val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1",
> "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly"
> ->"false", "checkpointLocation" -> "/path/to/checkpoint1", "rowsPerSecond" ->
> "1" )
> val optionsMap2 = Map[String, String]("Path" -> "/path/to/source2",
> "maxFilesPerTrigger" -> "1", "latestFirst" -> "false", "fileNameOnly"
> ->"false", "checkpointLocation" -> "/path/to/checkpoint2", "rowsPerSecond" ->
> "1" )
>
> spark.readStream.format("parquet").options(optionsMap1).load().createTempView("source1")
>
> spark.readStream.format("parquet").options(optionsMap2).load().createTempView("source2")
> spark.sql("select * from source1 where eventTime1 is not null and col1 is
> not null").withWatermark("eventTime1", "30
> minutes").createTempView("viewNotNull1")
> spark.sql("select * from source2 where eventTime2 is not null and col2 is
> not null").withWatermark("eventTime2", "30
> minutes").createTempView("viewNotNull2")
> spark.sql("select * from viewNotNull1 a join viewNotNull2 b on a.col1 =
> b.col2 and a.eventTime1 >= b.eventTime2 and a.eventTime1 <= b.eventTime2 +
> interval 2 hours").createTempView("join")
> val optionsMap3 = Map[String, String]("compression" -> "snappy","path" ->
> "/path/to/sink", "checkpointLocation" -> "/path/to/checkpoint3")
> spark.sql("select * from
> join").writeStream.outputMode("append").trigger(Trigger.ProcessingTime("5
> seconds")).format("parquet").options(optionsMap3).start()
> {code}
>
> *Exception:*
>
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure:
> Aborting TaskSet 4.0 because task 0 (partition 0)
> cannot run anywhere due to node and executor blacklist.
> Most recent failure:
> Lost task 0.2 in stage 4.0 (TID 6, executor 3): java.lang.NullPointerException
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown
> Source)
> at
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
> at
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$$anonfun$26.apply(StreamingSymmetricHashJoinExec.scala:412)
> at
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.findNextValueForIndex(SymmetricHashJoinStateManager.scala:197)
> at
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:221)
> at
> org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$$anon$2.getNext(SymmetricHashJoinStateManager.scala:157)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
> at
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply$mcV$spala:338)
> at
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream)
> at
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1$1.apply(Stream)
> at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:583)
> at
> org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:108)
> at
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.timeTakenMs(StreamingSymmetricHashJoinExec.scala:126)
> at
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$onOutputCompletion$1(StreamingSymmetricHashJ
> at
> org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$$anonfun$org$apache$spark$sql$execution$streaming$StreamingSymmetricHashJoinExec$$processPartitions$1.apply$mcV$sp(St:361)
> at
> org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
> at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
> at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown
> Source)
> at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:624)
> at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
> at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:216)
> at
> org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
> at
> org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
> at
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
> at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> at org.apache.spark.scheduler.Task.run(Task.scala:121)
> at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:407)
> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1408)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:413)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Blacklisting behavior can be configured via spark.blacklist.*. at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1890)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1877)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:929)
> at scala.Option.foreach(Option.scala:257)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:929)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2111)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2060)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2049)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
> at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:740)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2081)
> at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
> ... 19 moreException in thread "main"
> org.apache.spark.SparkException: Application application_2345 finished with
> failed status
> at org.apache.spark.deploy.yarn.Client.run(Client.scala:1158)
> at
> org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1606)
> at
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:851)
> at
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
> at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
> at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
> at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:926)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:935)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]