[
https://issues.apache.org/jira/browse/SPARK-20325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon updated SPARK-20325:
---------------------------------
Issue Type: Documentation (was: Bug)
> Spark Structured Streaming documentation Update: checkpoint configuration
> -------------------------------------------------------------------------
>
> Key: SPARK-20325
> URL: https://issues.apache.org/jira/browse/SPARK-20325
> Project: Spark
> Issue Type: Documentation
> Components: Structured Streaming
> Affects Versions: 2.1.0
> Reporter: Kate Eri
> Priority: Minor
>
> I have configured the following stream outputting to Kafka:
> {code}
> map.foreach(metric => {
> streamToProcess
> .groupBy(metric)
> .agg(count(metric))
> .writeStream
> .outputMode("complete")
> .option("checkpointLocation", checkpointDir)
> .foreach(kafkaWriter)
> .start()
> })
> {code}
> And configured the checkpoint Dir for each of output sinks like:
> .option("checkpointLocation", checkpointDir) according to the documentation
> =>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>
> As a result I've got the following exception:
> Cannot start query with id bf6a1003-6252-4c62-8249-c6a189701255 as another
> query with same id is already active. Perhaps you are attempting to restart a
> query from checkpoint that is already active.
> java.lang.IllegalStateException: Cannot start query with id
> bf6a1003-6252-4c62-8249-c6a189701255 as another query with same id is already
> active. Perhaps you are attempting to restart a query from checkpoint that is
> already active.
> at
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:291)
> So according to current spark logic for “foreach” sink the checkpoint
> configuration is loaded in the following way:
> {code:title=StreamingQueryManager.scala}
> val checkpointLocation = userSpecifiedCheckpointLocation.map {
> userSpecified =>
> new Path(userSpecified).toUri.toString
> }.orElse {
> df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
> new Path(location,
> userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
> }
> }.getOrElse {
> if (useTempCheckpointLocation) {
> Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
> } else {
> throw new AnalysisException(
> "checkpointLocation must be specified either " +
> """through option("checkpointLocation", ...) or """ +
> s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}",
> ...)""")
> }
> }
> {code}
> so first spark take checkpointDir from query, then from sparksession
> (spark.sql.streaming.checkpointLocation) and so on.
> But this behavior was not documented, thus two questions:
> 1) could we update documentation for Structured Streaming and describe this
> behavior
> 2) Do we really need to specify the checkpoint dir per query? what the reason
> for this? finally we will be forced to write some checkpointDir name
> generator, for example associate it with some particular named query and so
> on?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]