Repository: spark
Updated Branches:
  refs/heads/master 7e418e99c -> 976f3b122


[SPARK-17513][SQL] Make StreamExecution garbage-collect its metadata

## What changes were proposed in this pull request?
This PR modifies StreamExecution such that it discards metadata for batches 
that have already been fully processed. I used the purge method that was added 
as part of SPARK-17235.

This is a resubmission of 15126, which was based on work by frreiss in #15067, 
but fixed the test case along with some typos.

## How was this patch tested?
A new test case in StreamingQuerySuite. The test case would fail without the 
changes in this pull request.

Author: petermaxlee <petermax...@gmail.com>

Closes #15166 from petermaxlee/SPARK-17513-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/976f3b12
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/976f3b12
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/976f3b12

Branch: refs/heads/master
Commit: 976f3b1227c1a9e0b878e010531285fdba57b6a7
Parents: 7e418e9
Author: petermaxlee <petermax...@gmail.com>
Authored: Tue Sep 20 19:08:07 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Sep 20 19:08:07 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/streaming/MetadataLog.scala   |  1 +
 .../execution/streaming/StreamExecution.scala   |  7 ++++++
 .../sql/streaming/StreamingQuerySuite.scala     | 24 ++++++++++++++++++++
 3 files changed, 32 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/976f3b12/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
index 78d6be1..9e2604c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLog.scala
@@ -24,6 +24,7 @@ package org.apache.spark.sql.execution.streaming
  *  - Allow the user to query the latest batch id.
  *  - Allow the user to query the metadata object of a specified batch id.
  *  - Allow the user to query metadata objects in a range of batch ids.
+ *  - Allow the user to remove obsolete metadata
  */
 trait MetadataLog[T] {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/976f3b12/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index a1aae61..220f77d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -290,6 +290,13 @@ class StreamExecution(
       assert(offsetLog.add(currentBatchId, 
availableOffsets.toCompositeOffset(sources)),
         s"Concurrent update to the log. Multiple streaming jobs detected for 
$currentBatchId")
       logInfo(s"Committed offsets for batch $currentBatchId.")
+
+      // Now that we have logged the new batch, no further processing will 
happen for
+      // the previous batch, and it is safe to discard the old metadata.
+      // Note that purge is exclusive, i.e. it purges everything before 
currentBatchId.
+      // NOTE: If StreamExecution implements pipeline parallelism (multiple 
batches in
+      // flight at the same time), this cleanup logic will need to change.
+      offsetLog.purge(currentBatchId)
     } else {
       awaitBatchLock.lock()
       try {

http://git-wip-us.apache.org/repos/asf/spark/blob/976f3b12/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 9d58315..88f1f18 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -125,6 +125,30 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter {
     )
   }
 
+  testQuietly("StreamExecution metadata garbage collection") {
+    val inputData = MemoryStream[Int]
+    val mapped = inputData.toDS().map(6 / _)
+
+    // Run 3 batches, and then assert that only 1 metadata file is left at the 
end
+    // since the first 2 should have been purged.
+    testStream(mapped)(
+      AddData(inputData, 1, 2),
+      CheckAnswer(6, 3),
+      AddData(inputData, 1, 2),
+      CheckAnswer(6, 3, 6, 3),
+      AddData(inputData, 4, 6),
+      CheckAnswer(6, 3, 6, 3, 1, 1),
+
+      AssertOnQuery("metadata log should contain only one file") { q =>
+        val metadataLogDir = new 
java.io.File(q.offsetLog.metadataPath.toString)
+        val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
+        val toTest = logFileNames.filter(! _.endsWith(".crc"))  // Workaround 
for SPARK-17475
+        assert(toTest.size == 1 && toTest.head == "2")
+        true
+      }
+    )
+  }
+
   /**
    * A [[StreamAction]] to test the behavior of 
`StreamingQuery.awaitTermination()`.
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to