Repository: spark Updated Branches: refs/heads/master 95c95b71e -> caed89321
[SPARK-18927][SS] MemorySink for StructuredStreaming can't recover from checkpoint if location is provided in SessionConf ## What changes were proposed in this pull request? Checkpoint Location can be defined for a StructuredStreaming on a per-query basis by the `DataStreamWriter` options, but it can also be provided through SparkSession configurations. It should be able to recover in both cases when the OutputMode is Complete for MemorySinks. ## How was this patch tested? Unit tests Author: Burak Yavuz <brk...@gmail.com> Closes #16342 from brkyvz/chk-rec. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/caed8932 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/caed8932 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/caed8932 Branch: refs/heads/master Commit: caed89321fdabe83e46451ca4e968f86481ad500 Parents: 95c95b7 Author: Burak Yavuz <brk...@gmail.com> Authored: Tue Dec 20 14:19:35 2016 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Tue Dec 20 14:19:35 2016 -0800 ---------------------------------------------------------------------- .../spark/sql/streaming/DataStreamWriter.scala | 2 +- .../test/DataStreamReaderWriterSuite.scala | 32 +++++++++++++++----- 2 files changed, 25 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/caed8932/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index b3c600a..b7fc336 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -223,7 +223,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val sink = new MemorySink(df.schema, outputMode) val resultDf = Dataset.ofRows(df.sparkSession, new MemoryPlan(sink)) val chkpointLoc = extraOptions.get("checkpointLocation") - val recoverFromChkpoint = chkpointLoc.isDefined && outputMode == OutputMode.Complete() + val recoverFromChkpoint = outputMode == OutputMode.Complete() val query = df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), chkpointLoc, http://git-wip-us.apache.org/repos/asf/spark/blob/caed8932/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index acac0bf..9de3da3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -470,24 +470,22 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { sq.stop() } - test("MemorySink can recover from a checkpoint in Complete Mode") { + private def testMemorySinkCheckpointRecovery(chkLoc: String, provideInWriter: Boolean): Unit = { import testImplicits._ val ms = new MemoryStream[Int](0, sqlContext) val df = ms.toDF().toDF("a") - val checkpointLoc = newMetadataDir - val checkpointDir = new File(checkpointLoc, "offsets") - checkpointDir.mkdirs() - assert(checkpointDir.exists()) val tableName = "test" def startQuery: StreamingQuery = { - df.groupBy("a") + val writer = df.groupBy("a") .count() .writeStream .format("memory") .queryName(tableName) - .option("checkpointLocation", checkpointLoc) .outputMode("complete") - .start() + if (provideInWriter) { + writer.option("checkpointLocation", chkLoc) + } + writer.start() } // no exception here val q = startQuery @@ -513,6 +511,24 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { q2.stop() } + test("MemorySink can recover from a checkpoint in Complete Mode") { + val checkpointLoc = newMetadataDir + val checkpointDir = new File(checkpointLoc, "offsets") + checkpointDir.mkdirs() + assert(checkpointDir.exists()) + testMemorySinkCheckpointRecovery(checkpointLoc, provideInWriter = true) + } + + test("SPARK-18927: MemorySink can recover from a checkpoint provided in conf in Complete Mode") { + val checkpointLoc = newMetadataDir + val checkpointDir = new File(checkpointLoc, "offsets") + checkpointDir.mkdirs() + assert(checkpointDir.exists()) + withSQLConf(SQLConf.CHECKPOINT_LOCATION.key -> checkpointLoc) { + testMemorySinkCheckpointRecovery(checkpointLoc, provideInWriter = false) + } + } + test("append mode memory sink's do not support checkpoint recovery") { import testImplicits._ val ms = new MemoryStream[Int](0, sqlContext) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org