[ 
https://issues.apache.org/jira/browse/SPARK-24315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16616643#comment-16616643
 ] 

Marco Gaido commented on SPARK-24315:
-------------------------------------

[~joeyfezster] it has been a while ago, so I may be wrong, bur IIRC this was 
caused by some corrupted checkpoint dir and deleting it and restarting the job 
solved the issue.

> Multiple streaming jobs detected error causing job failure
> ----------------------------------------------------------
>
>                 Key: SPARK-24315
>                 URL: https://issues.apache.org/jira/browse/SPARK-24315
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Marco Gaido
>            Priority: Major
>
> We are running a simple structured streaming job. It reads data from Kafka 
> and writes it to HDFS. Unfortunately at startup, the application fails with 
> the following error. After some restarts the application finally starts 
> successfully.
> {code}
> org.apache.spark.sql.streaming.StreamingQueryException: assertion failed: 
> Concurrent update to the log. Multiple streaming jobs detected for 1
> === Streaming Query ===
> ....
> at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> Caused by: java.lang.AssertionError: assertion failed: Concurrent update to 
> the log. Multiple streaming jobs detected for 1
>         at scala.Predef$.assert(Predef.scala:170)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:339)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:338)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
>         at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
>         at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
>         ... 1 more
> {code}
> We have not set any value for `spark.streaming.concurrentJobs`. Our code 
> looks like:
> {code}
>           // read from kafka
>           .withWatermark("timestamp", "30 minutes")
>           .groupBy(window($"timestamp", "1 hour", "30 minutes"), ...).count()
>           // simple select of some fields with casts
>           .coalesce(1)
>           .writeStream
>           .trigger(Trigger.ProcessingTime("2 minutes"))
>           .option("checkpointLocation", checkpointDir)
>           // write to HDFS
>           .start()
>           .awaitTermination()
> {code}
> This may also be related to the presence of some data in the kafka queue to 
> process, so the time for the first batch may be longer than usual (as it is 
> quite common I think).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to