Repository: spark
Updated Branches:
  refs/heads/branch-1.2 076de46f2 -> bd70ff99e


[SPARK-4790][STREAMING] Fix ReceivedBlockTrackerSuite waits for old file...

...s to get deleted before continuing.

Since the deletes are happening asynchronously, the getFileStatus call might 
throw an exception in older HDFS
versions, if the delete happens between the time listFiles is called on the 
directory and getFileStatus is called
on the file in the getFileStatus method.

This PR addresses this by adding an option to delete the files synchronously 
and then waiting for the deletion to
complete before proceeding.

Author: Hari Shreedharan <[email protected]>

Closes #3726 from harishreedharan/spark-4790 and squashes the following commits:

bbbacd1 [Hari Shreedharan] Call cleanUpOldLogs only once in the tests.
3255f17 [Hari Shreedharan] Add test for async deletion. Remove method from 
ReceiverTracker that does not take waitForCompletion.
e4c83ec [Hari Shreedharan] Making waitForCompletion a mandatory param. Remove 
eventually from WALSuite since the cleanup method returns only after all files 
are deleted.
af00fd1 [Hari Shreedharan] [SPARK-4790][STREAMING] Fix 
ReceivedBlockTrackerSuite waits for old files to get deleted before continuing.

(cherry picked from commit 3610d3c615112faef98d94f04efaea602cc4aa8f)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bd70ff99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bd70ff99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bd70ff99

Branch: refs/heads/branch-1.2
Commit: bd70ff99e82571b3827bd5876087d7aa81283b97
Parents: 076de46
Author: Hari Shreedharan <[email protected]>
Authored: Wed Dec 31 14:35:07 2014 -0800
Committer: Tathagata Das <[email protected]>
Committed: Wed Dec 31 14:35:25 2014 -0800

----------------------------------------------------------------------
 .../streaming/receiver/ReceivedBlockHandler.scala |  8 ++++----
 .../scheduler/ReceivedBlockTracker.scala          |  9 ++++++---
 .../streaming/scheduler/ReceiverTracker.scala     |  2 +-
 .../streaming/util/WriteAheadLogManager.scala     | 17 +++++++++++++----
 .../streaming/ReceivedBlockHandlerSuite.scala     |  2 +-
 .../streaming/ReceivedBlockTrackerSuite.scala     |  2 +-
 .../spark/streaming/util/WriteAheadLogSuite.scala | 18 ++++++++++++++++--
 7 files changed, 42 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bd70ff99/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
index 8b97db8..f7a8ebe 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala
@@ -42,7 +42,7 @@ private[streaming] trait ReceivedBlockHandler {
   def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): 
ReceivedBlockStoreResult
 
   /** Cleanup old blocks older than the given threshold time */
-  def cleanupOldBlock(threshTime: Long)
+  def cleanupOldBlocks(threshTime: Long)
 }
 
 
@@ -82,7 +82,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
     BlockManagerBasedStoreResult(blockId)
   }
 
-  def cleanupOldBlock(threshTime: Long) {
+  def cleanupOldBlocks(threshTime: Long) {
     // this is not used as blocks inserted into the BlockManager are cleared 
by DStream's clearing
     // of BlockRDDs.
   }
@@ -192,8 +192,8 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
     WriteAheadLogBasedStoreResult(blockId, segment)
   }
 
-  def cleanupOldBlock(threshTime: Long) {
-    logManager.cleanupOldLogs(threshTime)
+  def cleanupOldBlocks(threshTime: Long) {
+    logManager.cleanupOldLogs(threshTime, waitForCompletion = false)
   }
 
   def stop() {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd70ff99/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
index 02758e0..2ce458c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockTracker.scala
@@ -139,14 +139,17 @@ private[streaming] class ReceivedBlockTracker(
     getReceivedBlockQueue(streamId).toSeq
   }
 
-  /** Clean up block information of old batches. */
-  def cleanupOldBatches(cleanupThreshTime: Time): Unit = synchronized {
+  /**
+   * Clean up block information of old batches. If waitForCompletion is true, 
this method
+   * returns only after the files are cleaned up.
+   */
+  def cleanupOldBatches(cleanupThreshTime: Time, waitForCompletion: Boolean): 
Unit = synchronized {
     assert(cleanupThreshTime.milliseconds < clock.currentTime())
     val timesToCleanup = timeToAllocatedBlocks.keys.filter { _ < 
cleanupThreshTime }.toSeq
     logInfo("Deleting batches " + timesToCleanup)
     writeToLog(BatchCleanupEvent(timesToCleanup))
     timeToAllocatedBlocks --= timesToCleanup
-    logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds))
+    logManagerOption.foreach(_.cleanupOldLogs(cleanupThreshTime.milliseconds, 
waitForCompletion))
     log
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bd70ff99/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 1f0e442..8dbb42a 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -121,7 +121,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
 
     /** Clean up metadata older than the given threshold time */
   def cleanupOldMetadata(cleanupThreshTime: Time) {
-    receivedBlockTracker.cleanupOldBatches(cleanupThreshTime)
+    receivedBlockTracker.cleanupOldBatches(cleanupThreshTime, 
waitForCompletion = false)
   }
 
   /** Register a receiver */

http://git-wip-us.apache.org/repos/asf/spark/blob/bd70ff99/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
index 70d2343..166661b 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala
@@ -19,11 +19,11 @@ package org.apache.spark.streaming.util
 import java.nio.ByteBuffer
 
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Future}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hadoop.fs.permission.FsPermission
 import org.apache.spark.Logging
 import org.apache.spark.util.Utils
 import WriteAheadLogManager._
@@ -124,8 +124,12 @@ private[streaming] class WriteAheadLogManager(
    * files, which is usually based on the local system time. So if there is 
coordination necessary
    * between the node calculating the threshTime (say, driver node), and the 
local system time
    * (say, worker node), the caller has to take account of possible time skew.
+   *
+   * If waitForCompletion is set to true, this method will return only after 
old logs have been
+   * deleted. This should be set to true only for testing. Else the files will 
be deleted
+   * asynchronously.
    */
-  def cleanupOldLogs(threshTime: Long): Unit = {
+  def cleanupOldLogs(threshTime: Long, waitForCompletion: Boolean): Unit = {
     val oldLogFiles = synchronized { pastLogs.filter { _.endTime < threshTime 
} }
     logInfo(s"Attempting to clear ${oldLogFiles.size} old log files in 
$logDirectory " +
       s"older than $threshTime: ${oldLogFiles.map { _.path }.mkString("\n")}")
@@ -146,10 +150,15 @@ private[streaming] class WriteAheadLogManager(
       logInfo(s"Cleared log files in $logDirectory older than $threshTime")
     }
     if (!executionContext.isShutdown) {
-      Future { deleteFiles() }
+      val f = Future { deleteFiles() }
+      if (waitForCompletion) {
+        import scala.concurrent.duration._
+        Await.ready(f, 1 second)
+      }
     }
   }
 
+
   /** Stop the manager, close any open log writer */
   def stop(): Unit = synchronized {
     if (currentLogWriter != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/bd70ff99/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 3661e16..132ff24 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -168,7 +168,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with 
BeforeAndAfter with Matche
       manualClock.currentTime() shouldEqual 5000L
 
       val cleanupThreshTime = 3000L
-      handler.cleanupOldBlock(cleanupThreshTime)
+      handler.cleanupOldBlocks(cleanupThreshTime)
       eventually(timeout(10000 millis), interval(10 millis)) {
         getWriteAheadLogFiles().size should be < preCleanupLogFiles.size
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/bd70ff99/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
index 01a09b6..de7e9d6 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala
@@ -166,7 +166,7 @@ class ReceivedBlockTrackerSuite
     // Cleanup first batch but not second batch
     val oldestLogFile = getWriteAheadLogFiles().head
     incrementTime()
-    tracker3.cleanupOldBatches(batchTime2)
+    tracker3.cleanupOldBatches(batchTime2, waitForCompletion = true)
 
     // Verify that the batch allocations have been cleaned, and the act has 
been written to log
     tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual 
Seq.empty

http://git-wip-us.apache.org/repos/asf/spark/blob/bd70ff99/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 8f69bcb..7ce9499 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -182,15 +182,29 @@ class WriteAheadLogSuite extends FunSuite with 
BeforeAndAfter {
   }
 
   test("WriteAheadLogManager - cleanup old logs") {
+    logCleanUpTest(waitForCompletion = false)
+  }
+
+  test("WriteAheadLogManager - cleanup old logs synchronously") {
+    logCleanUpTest(waitForCompletion = true)
+  }
+
+  private def logCleanUpTest(waitForCompletion: Boolean): Unit = {
     // Write data with manager, recover with new manager and verify
     val manualClock = new ManualClock
     val dataToWrite = generateRandomData()
     manager = writeDataUsingManager(testDir, dataToWrite, manualClock, 
stopManager = false)
     val logFiles = getLogFilesInDirectory(testDir)
     assert(logFiles.size > 1)
-    manager.cleanupOldLogs(manualClock.currentTime() / 2)
-    eventually(timeout(1 second), interval(10 milliseconds)) {
+
+    manager.cleanupOldLogs(manualClock.currentTime() / 2, waitForCompletion)
+
+    if (waitForCompletion) {
       assert(getLogFilesInDirectory(testDir).size < logFiles.size)
+    } else {
+      eventually(timeout(1 second), interval(10 milliseconds)) {
+        assert(getLogFilesInDirectory(testDir).size < logFiles.size)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to