Repository: spark
Updated Branches:
  refs/heads/branch-2.0 de545e7c8 -> e2452c632


[SPARK-18337] Complete mode memory sinks should be able to recover from 
checkpoints

## What changes were proposed in this pull request?

It would be nice if memory sinks can also recover from checkpoints. For 
correctness reasons, the only time we should support it is in `Complete` 
OutputMode. We can support this in CompleteMode, because the output of the 
StateStore is already persisted in the checkpoint directory.

## How was this patch tested?

Unit test

Author: Burak Yavuz <[email protected]>

Closes #15801 from brkyvz/mem-stream.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2452c63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2452c63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2452c63

Branch: refs/heads/branch-2.0
Commit: e2452c6326d70d45b0354ebf0169d14ef739ba73
Parents: de545e7
Author: Burak Yavuz <[email protected]>
Authored: Tue Nov 15 13:09:29 2016 -0800
Committer: Tathagata Das <[email protected]>
Committed: Tue Nov 15 13:26:27 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/streaming/DataStreamWriter.scala  |  6 +-
 .../test/DataStreamReaderWriterSuite.scala      | 65 ++++++++++++++++++++
 2 files changed, 69 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e2452c63/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index b959444..daed1dc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -222,14 +222,16 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
 
       val sink = new MemorySink(df.schema, outputMode)
       val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink))
+      val chkpointLoc = extraOptions.get("checkpointLocation")
+      val recoverFromChkpoint = chkpointLoc.isDefined && outputMode == 
OutputMode.Complete()
       val query = 
df.sparkSession.sessionState.streamingQueryManager.startQuery(
         extraOptions.get("queryName"),
-        extraOptions.get("checkpointLocation"),
+        chkpointLoc,
         df,
         sink,
         outputMode,
         useTempCheckpointLocation = true,
-        recoverFromCheckpointLocation = false,
+        recoverFromCheckpointLocation = recoverFromChkpoint,
         trigger = trigger)
       resultDf.createOrReplaceTempView(query.name)
       query

http://git-wip-us.apache.org/repos/asf/spark/blob/e2452c63/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index f099439..5630464 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.streaming.test
 
+import java.io.File
 import java.util.concurrent.TimeUnit
 
 import scala.concurrent.duration._
@@ -467,4 +468,68 @@ class DataStreamReaderWriterSuite extends StreamTest with 
BeforeAndAfter {
     val sq = df.writeStream.format("console").start()
     sq.stop()
   }
+
+  test("MemorySink can recover from a checkpoint in Complete Mode") {
+    import testImplicits._
+    val ms = new MemoryStream[Int](0, sqlContext)
+    val df = ms.toDF().toDF("a")
+    val checkpointLoc = newMetadataDir
+    val checkpointDir = new File(checkpointLoc, "offsets")
+    checkpointDir.mkdirs()
+    assert(checkpointDir.exists())
+    val tableName = "test"
+    def startQuery: StreamingQuery = {
+      df.groupBy("a")
+        .count()
+        .writeStream
+        .format("memory")
+        .queryName(tableName)
+        .option("checkpointLocation", checkpointLoc)
+        .outputMode("complete")
+        .start()
+    }
+    // no exception here
+    val q = startQuery
+    ms.addData(0, 1)
+    q.processAllAvailable()
+    q.stop()
+
+    checkAnswer(
+      spark.table(tableName),
+      Seq(Row(0, 1), Row(1, 1))
+    )
+    spark.sql(s"drop table $tableName")
+    // verify table is dropped
+    intercept[AnalysisException](spark.table(tableName).collect())
+    val q2 = startQuery
+    ms.addData(0)
+    q2.processAllAvailable()
+    checkAnswer(
+      spark.table(tableName),
+      Seq(Row(0, 2), Row(1, 1))
+    )
+
+    q2.stop()
+  }
+
+  test("append mode memory sink's do not support checkpoint recovery") {
+    import testImplicits._
+    val ms = new MemoryStream[Int](0, sqlContext)
+    val df = ms.toDF().toDF("a")
+    val checkpointLoc = newMetadataDir
+    val checkpointDir = new File(checkpointLoc, "offsets")
+    checkpointDir.mkdirs()
+    assert(checkpointDir.exists())
+
+    val e = intercept[AnalysisException] {
+      df.writeStream
+        .format("memory")
+        .queryName("test")
+        .option("checkpointLocation", checkpointLoc)
+        .outputMode("append")
+        .start()
+    }
+    assert(e.getMessage.contains("does not support recovering"))
+    assert(e.getMessage.contains("checkpoint location"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to