[
https://issues.apache.org/jira/browse/SPARK-24315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16616643#comment-16616643
]
Marco Gaido commented on SPARK-24315:
-------------------------------------
[~joeyfezster] it has been a while ago, so I may be wrong, bur IIRC this was
caused by some corrupted checkpoint dir and deleting it and restarting the job
solved the issue.
> Multiple streaming jobs detected error causing job failure
> ----------------------------------------------------------
>
> Key: SPARK-24315
> URL: https://issues.apache.org/jira/browse/SPARK-24315
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.3.0
> Reporter: Marco Gaido
> Priority: Major
>
> We are running a simple structured streaming job. It reads data from Kafka
> and writes it to HDFS. Unfortunately at startup, the application fails with
> the following error. After some restarts the application finally starts
> successfully.
> {code}
> org.apache.spark.sql.streaming.StreamingQueryException: assertion failed:
> Concurrent update to the log. Multiple streaming jobs detected for 1
> === Streaming Query ===
> ....
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> Caused by: java.lang.AssertionError: assertion failed: Concurrent update to
> the log. Multiple streaming jobs detected for 1
> at scala.Predef$.assert(Predef.scala:170)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:339)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:338)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
> at
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
> at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> ... 1 more
> {code}
> We have not set any value for `spark.streaming.concurrentJobs`. Our code
> looks like:
> {code}
> // read from kafka
> .withWatermark("timestamp", "30 minutes")
> .groupBy(window($"timestamp", "1 hour", "30 minutes"), ...).count()
> // simple select of some fields with casts
> .coalesce(1)
> .writeStream
> .trigger(Trigger.ProcessingTime("2 minutes"))
> .option("checkpointLocation", checkpointDir)
> // write to HDFS
> .start()
> .awaitTermination()
> {code}
> This may also be related to the presence of some data in the kafka queue to
> process, so the time for the first batch may be longer than usual (as it is
> quite common I think).
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]