Amit Baghel created SPARK-19768:
-----------------------------------

             Summary: "This query does not support recovering from checkpoint 
location" - Error for both  aggregate  and  non-aggregate queries in Structured 
streaming
                 Key: SPARK-19768
                 URL: https://issues.apache.org/jira/browse/SPARK-19768
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.1.0
            Reporter: Amit Baghel


I am running JavaStructuredKafkaWordCount.java with checkpointLocation. Output 
mode is "complete". Below is relevant code.
{code}
 // Generate running word count
    Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, 
String>() {
      @Override
      public Iterator<String> call(String x) {
        return Arrays.asList(x.split(" ")).iterator();
      }
    }, Encoders.STRING()).groupBy("value").count();

    // Start running the query that prints the running counts to the console
    StreamingQuery query = wordCounts.writeStream()
      .outputMode("complete")
      .format("console")
      .option("checkpointLocation", "/tmp/checkpoint-data")
      .start();
{code}

This example runs successfully and writes data in checkpoint directory. When I 
re-run the program it throws below exception
{code}
Exception in thread "main" org.apache.spark.sql.AnalysisException: This query 
does not support recovering from checkpoint location. Delete 
/tmp/checkpoint-data/offsets to start over.;
        at 
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219)
        at 
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
        at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
        at 
com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85)
{code}

Then I modified JavaStructuredKafkaWordCount.java to have non aggregate query 
with output mode as "append". Please see the code below.
{code}
// no aggregations
    Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, 
String>() {
      @Override
      public Iterator<String> call(String x) {
        return Arrays.asList(x.split(" ")).iterator();
      }
    }, Encoders.STRING()).select("value");

    // append mode with console
    StreamingQuery query = wordCounts.writeStream()
      .outputMode("append")
      .format("console")
      .option("checkpointLocation", "/tmp/checkpoint-data/offsets")
      .start();
{code}

This modified code runs successfully and writes data in checkpoint directory. 
When I re-run the program it throws same exception
{code}
Exception in thread "main" org.apache.spark.sql.AnalysisException: This query 
does not support recovering from checkpoint location. Delete 
/tmp/checkpoint-data/offsets to start over.;
        at 
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219)
        at 
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
        at 
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
        at 
com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85)
{code}






--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to