[
https://issues.apache.org/jira/browse/SPARK-20325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15970255#comment-15970255
]
Hyukjin Kwon edited comment on SPARK-20325 at 4/16/17 6:30 AM:
---------------------------------------------------------------
It sounds the documentation issue for ...
{quote}
could we update documentation for Structured Streaming and describe this
behavior
{quote}
I think this question should go to the mailing list.
{quote}
Do we really need to specify the checkpoint dir per query? what the reason for
this? finally we will be forced to write some checkpointDir name generator, for
example associate it with some particular named query and so on?
{quote}
was (Author: hyukjin.kwon):
It sounds the documentation issue for ...
{quote}
could we update documentation for Structured Streaming and describe this
behavior
{quote}
{quote}
Do we really need to specify the checkpoint dir per query? what the reason for
this? finally we will be forced to write some checkpointDir name generator, for
example associate it with some particular named query and so on?
{quote}
I think this question should go to the mailing list.
> Spark Structured Streaming documentation Update: checkpoint configuration
> -------------------------------------------------------------------------
>
> Key: SPARK-20325
> URL: https://issues.apache.org/jira/browse/SPARK-20325
> Project: Spark
> Issue Type: Documentation
> Components: Structured Streaming
> Affects Versions: 2.1.0
> Reporter: Kate Eri
> Priority: Minor
>
> I have configured the following stream outputting to Kafka:
> {code}
> map.foreach(metric => {
> streamToProcess
> .groupBy(metric)
> .agg(count(metric))
> .writeStream
> .outputMode("complete")
> .option("checkpointLocation", checkpointDir)
> .foreach(kafkaWriter)
> .start()
> })
> {code}
> And configured the checkpoint Dir for each of output sinks like:
> .option("checkpointLocation", checkpointDir) according to the documentation
> =>
> http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
>
> As a result I've got the following exception:
> Cannot start query with id bf6a1003-6252-4c62-8249-c6a189701255 as another
> query with same id is already active. Perhaps you are attempting to restart a
> query from checkpoint that is already active.
> java.lang.IllegalStateException: Cannot start query with id
> bf6a1003-6252-4c62-8249-c6a189701255 as another query with same id is already
> active. Perhaps you are attempting to restart a query from checkpoint that is
> already active.
> at
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:291)
> So according to current spark logic for “foreach” sink the checkpoint
> configuration is loaded in the following way:
> {code:title=StreamingQueryManager.scala}
> val checkpointLocation = userSpecifiedCheckpointLocation.map {
> userSpecified =>
> new Path(userSpecified).toUri.toString
> }.orElse {
> df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
> new Path(location,
> userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
> }
> }.getOrElse {
> if (useTempCheckpointLocation) {
> Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
> } else {
> throw new AnalysisException(
> "checkpointLocation must be specified either " +
> """through option("checkpointLocation", ...) or """ +
> s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}",
> ...)""")
> }
> }
> {code}
> so first spark take checkpointDir from query, then from sparksession
> (spark.sql.streaming.checkpointLocation) and so on.
> But this behavior was not documented, thus two questions:
> 1) could we update documentation for Structured Streaming and describe this
> behavior
> 2) Do we really need to specify the checkpoint dir per query? what the reason
> for this? finally we will be forced to write some checkpointDir name
> generator, for example associate it with some particular named query and so
> on?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]