Repository: spark Updated Branches: refs/heads/master 6f9e598cc -> 2afdaa980
[SPARK-18337] Complete mode memory sinks should be able to recover from checkpoints ## What changes were proposed in this pull request? It would be nice if memory sinks can also recover from checkpoints. For correctness reasons, the only time we should support it is in `Complete` OutputMode. We can support this in CompleteMode, because the output of the StateStore is already persisted in the checkpoint directory. ## How was this patch tested? Unit test Author: Burak Yavuz <[email protected]> Closes #15801 from brkyvz/mem-stream. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2afdaa98 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2afdaa98 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2afdaa98 Branch: refs/heads/master Commit: 2afdaa9805f44b45242978eab9a9623d31dddbf3 Parents: 6f9e598 Author: Burak Yavuz <[email protected]> Authored: Tue Nov 15 13:09:29 2016 -0800 Committer: Tathagata Das <[email protected]> Committed: Tue Nov 15 13:09:29 2016 -0800 ---------------------------------------------------------------------- .../spark/sql/streaming/DataStreamWriter.scala | 6 +- .../test/DataStreamReaderWriterSuite.scala | 65 ++++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2afdaa98/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index b959444..daed1dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -222,14 +222,16 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val sink = new MemorySink(df.schema, outputMode) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) + val chkpointLoc = extraOptions.get("checkpointLocation") + val recoverFromChkpoint = chkpointLoc.isDefined && outputMode == OutputMode.Complete() val query = df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), - extraOptions.get("checkpointLocation"), + chkpointLoc, df, sink, outputMode, useTempCheckpointLocation = true, - recoverFromCheckpointLocation = false, + recoverFromCheckpointLocation = recoverFromChkpoint, trigger = trigger) resultDf.createOrReplaceTempView(query.name) query http://git-wip-us.apache.org/repos/asf/spark/blob/2afdaa98/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index f099439..5630464 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.streaming.test +import java.io.File import java.util.concurrent.TimeUnit import scala.concurrent.duration._ @@ -467,4 +468,68 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { val sq = df.writeStream.format("console").start() sq.stop() } + + test("MemorySink can recover from a checkpoint in Complete Mode") { + import testImplicits._ + val ms = new MemoryStream[Int](0, sqlContext) + val df = ms.toDF().toDF("a") + val checkpointLoc = newMetadataDir + val checkpointDir = new File(checkpointLoc, "offsets") + checkpointDir.mkdirs() + assert(checkpointDir.exists()) + val tableName = "test" + def startQuery: StreamingQuery = { + df.groupBy("a") + .count() + .writeStream + .format("memory") + .queryName(tableName) + .option("checkpointLocation", checkpointLoc) + .outputMode("complete") + .start() + } + // no exception here + val q = startQuery + ms.addData(0, 1) + q.processAllAvailable() + q.stop() + + checkAnswer( + spark.table(tableName), + Seq(Row(0, 1), Row(1, 1)) + ) + spark.sql(s"drop table $tableName") + // verify table is dropped + intercept[AnalysisException](spark.table(tableName).collect()) + val q2 = startQuery + ms.addData(0) + q2.processAllAvailable() + checkAnswer( + spark.table(tableName), + Seq(Row(0, 2), Row(1, 1)) + ) + + q2.stop() + } + + test("append mode memory sink's do not support checkpoint recovery") { + import testImplicits._ + val ms = new MemoryStream[Int](0, sqlContext) + val df = ms.toDF().toDF("a") + val checkpointLoc = newMetadataDir + val checkpointDir = new File(checkpointLoc, "offsets") + checkpointDir.mkdirs() + assert(checkpointDir.exists()) + + val e = intercept[AnalysisException] { + df.writeStream + .format("memory") + .queryName("test") + .option("checkpointLocation", checkpointLoc) + .outputMode("append") + .start() + } + assert(e.getMessage.contains("does not support recovering")) + assert(e.getMessage.contains("checkpoint location")) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
