[
https://issues.apache.org/jira/browse/SPARK-19768?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Amit Baghel updated SPARK-19768:
--------------------------------
Description:
I am running JavaStructuredKafkaWordCount.java example with checkpointLocation.
Output mode is "complete". Below is relevant code.
{code}
// Generate running word count
Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String,
String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING()).groupBy("value").count();
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.option("checkpointLocation", "/tmp/checkpoint-data")
.start();
{code}
This example runs successfully and writes data in checkpoint directory. When I
re-run the program it throws below exception
{code}
Exception in thread "main" org.apache.spark.sql.AnalysisException: This query
does not support recovering from checkpoint location. Delete
/tmp/checkpoint-data/offsets to start over.;
at
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219)
at
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
at
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
at
com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85)
{code}
Then I modified JavaStructuredKafkaWordCount.java to have non aggregate query
with output mode as "append". Please see the code below.
{code}
// no aggregations
Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String,
String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING()).select("value");
// append mode with console
StreamingQuery query = wordCounts.writeStream()
.outputMode("append")
.format("console")
.option("checkpointLocation", "/tmp/checkpoint-data")
.start();
{code}
This modified code runs successfully and writes data in checkpoint directory.
When I re-run the program it throws same exception
{code}
Exception in thread "main" org.apache.spark.sql.AnalysisException: This query
does not support recovering from checkpoint location. Delete
/tmp/checkpoint-data/offsets to start over.;
at
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219)
at
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
at
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
at
com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85)
{code}
was:
I am running JavaStructuredKafkaWordCount.java with checkpointLocation. Output
mode is "complete". Below is relevant code.
{code}
// Generate running word count
Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String,
String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING()).groupBy("value").count();
// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
.outputMode("complete")
.format("console")
.option("checkpointLocation", "/tmp/checkpoint-data")
.start();
{code}
This example runs successfully and writes data in checkpoint directory. When I
re-run the program it throws below exception
{code}
Exception in thread "main" org.apache.spark.sql.AnalysisException: This query
does not support recovering from checkpoint location. Delete
/tmp/checkpoint-data/offsets to start over.;
at
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219)
at
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
at
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
at
com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85)
{code}
Then I modified JavaStructuredKafkaWordCount.java to have non aggregate query
with output mode as "append". Please see the code below.
{code}
// no aggregations
Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String,
String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING()).select("value");
// append mode with console
StreamingQuery query = wordCounts.writeStream()
.outputMode("append")
.format("console")
.option("checkpointLocation", "/tmp/checkpoint-data")
.start();
{code}
This modified code runs successfully and writes data in checkpoint directory.
When I re-run the program it throws same exception
{code}
Exception in thread "main" org.apache.spark.sql.AnalysisException: This query
does not support recovering from checkpoint location. Delete
/tmp/checkpoint-data/offsets to start over.;
at
org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219)
at
org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
at
org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
at
com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85)
{code}
> Error for both aggregate and non-aggregate queries in Structured streaming
> - "This query does not support recovering from checkpoint location"
> -------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-19768
> URL: https://issues.apache.org/jira/browse/SPARK-19768
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.1.0
> Reporter: Amit Baghel
>
> I am running JavaStructuredKafkaWordCount.java example with
> checkpointLocation. Output mode is "complete". Below is relevant code.
> {code}
> // Generate running word count
> Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String,
> String>() {
> @Override
> public Iterator<String> call(String x) {
> return Arrays.asList(x.split(" ")).iterator();
> }
> }, Encoders.STRING()).groupBy("value").count();
> // Start running the query that prints the running counts to the console
> StreamingQuery query = wordCounts.writeStream()
> .outputMode("complete")
> .format("console")
> .option("checkpointLocation", "/tmp/checkpoint-data")
> .start();
> {code}
> This example runs successfully and writes data in checkpoint directory. When
> I re-run the program it throws below exception
> {code}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: This query
> does not support recovering from checkpoint location. Delete
> /tmp/checkpoint-data/offsets to start over.;
> at
> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219)
> at
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
> at
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
> at
> com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85)
> {code}
> Then I modified JavaStructuredKafkaWordCount.java to have non aggregate query
> with output mode as "append". Please see the code below.
> {code}
> // no aggregations
> Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String,
> String>() {
> @Override
> public Iterator<String> call(String x) {
> return Arrays.asList(x.split(" ")).iterator();
> }
> }, Encoders.STRING()).select("value");
> // append mode with console
> StreamingQuery query = wordCounts.writeStream()
> .outputMode("append")
> .format("console")
> .option("checkpointLocation", "/tmp/checkpoint-data")
> .start();
> {code}
> This modified code runs successfully and writes data in checkpoint directory.
> When I re-run the program it throws same exception
> {code}
> Exception in thread "main" org.apache.spark.sql.AnalysisException: This query
> does not support recovering from checkpoint location. Delete
> /tmp/checkpoint-data/offsets to start over.;
> at
> org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:219)
> at
> org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
> at
> org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
> at
> com.spark.JavaStructuredKafkaWordCount.main(JavaStructuredKafkaWordCount.java:85)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]