Repository: spark
Updated Branches:
  refs/heads/master 95ae20946 -> 3ae4f07de


[SPARK-17159][STREAM] Significant speed up for running spark streaming against 
Object store.

## What changes were proposed in this pull request?

Original work by Steve Loughran.
Based on #17745.

This is a minimal patch of changes to FileInputDStream to reduce File status 
requests when querying files. Each call to file status is 3+ http calls to 
object store. This patch eliminates the need for it, by using FileStatus 
objects.

This is a minor optimisation when working with filesystems, but significant 
when working with object stores.

## How was this patch tested?

Tests included. Existing tests pass.

Closes #22339 from ScrapCodes/PR_17745.

Lead-authored-by: Prashant Sharma <[email protected]>
Co-authored-by: Steve Loughran <[email protected]>
Signed-off-by: Sean Owen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ae4f07d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ae4f07d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ae4f07d

Branch: refs/heads/master
Commit: 3ae4f07de06e267f0363a53264876ea99dd731df
Parents: 95ae209
Author: Prashant Sharma <[email protected]>
Authored: Fri Oct 5 02:22:06 2018 +0100
Committer: Sean Owen <[email protected]>
Committed: Fri Oct 5 02:22:06 2018 +0100

----------------------------------------------------------------------
 .../streaming/dstream/FileInputDStream.scala    | 57 ++++++------
 .../spark/streaming/InputStreamsSuite.scala     | 98 ++++++++++++++++----
 .../apache/spark/streaming/TestSuiteBase.scala  | 14 ++-
 3 files changed, 118 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3ae4f07d/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index b8a5a96..438847c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -17,19 +17,19 @@
 
 package org.apache.spark.streaming.dstream
 
-import java.io.{IOException, ObjectInputStream}
+import java.io.{FileNotFoundException, IOException, ObjectInputStream}
 
 import scala.collection.mutable
 import scala.reflect.ClassTag
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
 
 import org.apache.spark.rdd.{RDD, UnionRDD}
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.scheduler.StreamInputInfo
-import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, 
Utils}
+import org.apache.spark.util.{SerializableConfiguration, Utils}
 
 /**
  * This class represents an input stream that monitors a Hadoop-compatible 
filesystem for new
@@ -122,9 +122,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
   // Set of files that were selected in the remembered batches
   @transient private var recentlySelectedFiles = new mutable.HashSet[String]()
 
-  // Read-through cache of file mod times, used to speed up mod time lookups
-  @transient private var fileToModTime = new TimeStampedHashMap[String, 
Long](true)
-
   // Timestamp of the last round of finding files
   @transient private var lastNewFileFindingTime = 0L
 
@@ -140,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
    * a union RDD out of them. Note that this maintains the list of files that 
were processed
    * in the latest modification time in the previous call to this method. This 
is because the
    * modification time returned by the FileStatus API seems to return times 
only at the
-   * granularity of seconds. And new files may have the same modification time 
as the
+   * granularity of seconds in HDFS. And new files may have the same 
modification time as the
    * latest modification time in the previous call to this method yet was not 
reported in
    * the previous call.
    */
@@ -174,8 +171,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
       logDebug("Cleared files are:\n" +
         oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
     }
-    // Delete file mod times that weren't accessed in the last round of 
getting new files
-    fileToModTime.clearOldValues(lastNewFileFindingTime - 1)
   }
 
   /**
@@ -197,29 +192,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
       logDebug(s"Getting new files for time $currentTime, " +
         s"ignoring files older than $modTimeIgnoreThreshold")
 
-      val newFileFilter = new PathFilter {
-        def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
-      }
-      val directoryFilter = new PathFilter {
-        override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
-      }
-      val directories = fs.globStatus(directoryPath, 
directoryFilter).map(_.getPath)
+      val directories = 
Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
+        .filter(_.isDirectory)
+        .map(_.getPath)
       val newFiles = directories.flatMap(dir =>
-        fs.listStatus(dir, newFileFilter).map(_.getPath.toString))
+        fs.listStatus(dir)
+          .filter(isNewFile(_, currentTime, modTimeIgnoreThreshold))
+          .map(_.getPath.toString))
       val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
-      logInfo("Finding new files took " + timeTaken + " ms")
-      logDebug("# cached file times = " + fileToModTime.size)
+      logDebug(s"Finding new files took $timeTaken ms")
       if (timeTaken > slideDuration.milliseconds) {
         logWarning(
-          "Time taken to find new files exceeds the batch size. " +
+          s"Time taken to find new files $timeTaken exceeds the batch size. " +
             "Consider increasing the batch size or reducing the number of " +
-            "files in the monitored directory."
+            "files in the monitored directories."
         )
       }
       newFiles
     } catch {
+      case e: FileNotFoundException =>
+        logWarning(s"No directory to scan: $directoryPath: $e")
+        Array.empty
       case e: Exception =>
-        logWarning("Error finding new files", e)
+        logWarning(s"Error finding new files under $directoryPath", e)
         reset()
         Array.empty
     }
@@ -242,8 +237,16 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
    *   The files with mod time T+5 are not remembered and cannot be ignored 
(since, t+5 > t+1).
    *   Hence they can get selected as new files again. To prevent this, files 
whose mod time is more
    *   than current batch time are not considered.
+   * @param fileStatus file status
+   * @param currentTime time of the batch
+   * @param modTimeIgnoreThreshold the ignore threshold
+   * @return true if the file has been modified within the batch window
    */
-  private def isNewFile(path: Path, currentTime: Long, modTimeIgnoreThreshold: 
Long): Boolean = {
+  private def isNewFile(
+     fileStatus: FileStatus,
+     currentTime: Long,
+     modTimeIgnoreThreshold: Long): Boolean = {
+    val path = fileStatus.getPath
     val pathStr = path.toString
     // Reject file if it does not satisfy filter
     if (!filter(path)) {
@@ -251,7 +254,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
       return false
     }
     // Reject file if it was created before the ignore time
-    val modTime = getFileModTime(path)
+    val modTime = fileStatus.getModificationTime()
     if (modTime <= modTimeIgnoreThreshold) {
       // Use <= instead of < to avoid SPARK-4518
       logDebug(s"$pathStr ignored as mod time $modTime <= ignore time 
$modTimeIgnoreThreshold")
@@ -293,11 +296,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
     new UnionRDD(context.sparkContext, fileRDDs)
   }
 
-  /** Get file mod time from cache or fetch it from the file system */
-  private def getFileModTime(path: Path) = {
-    fileToModTime.getOrElseUpdate(path.toString, 
fs.getFileStatus(path).getModificationTime())
-  }
-
   private def directoryPath: Path = {
     if (_path == null) _path = new Path(directory)
     _path
@@ -319,7 +317,6 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
     generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
     batchTimeToSelectedFiles = new mutable.HashMap[Time, Array[String]]
     recentlySelectedFiles = new mutable.HashSet[String]()
-    fileToModTime = new TimeStampedHashMap[String, Long](true)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3ae4f07d/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index b5d36a3..1cf21e8 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -27,7 +27,8 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import com.google.common.io.Files
-import org.apache.hadoop.fs.Path
+import org.apache.commons.io.IOUtils
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.{LongWritable, Text}
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
 import org.scalatest.BeforeAndAfter
@@ -130,10 +131,8 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
   }
 
   test("binary records stream") {
-    var testDir: File = null
-    try {
+    withTempDir { testDir =>
       val batchDuration = Seconds(2)
-      testDir = Utils.createTempDir()
       // Create a file that exists before the StreamingContext is created:
       val existingFile = new File(testDir, "0")
       Files.write("0\n", existingFile, StandardCharsets.UTF_8)
@@ -176,8 +175,6 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
           assert(obtainedOutput(i) === input.map(b => (b + i).toByte))
         }
       }
-    } finally {
-      if (testDir != null) Utils.deleteRecursively(testDir)
     }
   }
 
@@ -190,10 +187,8 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
   }
 
   test("file input stream - wildcard") {
-    var testDir: File = null
-    try {
+    withTempDir { testDir =>
       val batchDuration = Seconds(2)
-      testDir = Utils.createTempDir()
       val testSubDir1 = Utils.createDirectory(testDir.toString, "tmp1")
       val testSubDir2 = Utils.createDirectory(testDir.toString, "tmp2")
 
@@ -221,12 +216,12 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
         // not enough to trigger a batch
         clock.advance(batchDuration.milliseconds / 2)
 
-        def createFileAndAdvenceTime(data: Int, dir: File): Unit = {
+        def createFileAndAdvanceTime(data: Int, dir: File): Unit = {
           val file = new File(testSubDir1, data.toString)
           Files.write(data + "\n", file, StandardCharsets.UTF_8)
           assert(file.setLastModified(clock.getTimeMillis()))
           assert(file.lastModified === clock.getTimeMillis())
-          logInfo("Created file " + file)
+          logInfo(s"Created file $file")
           // Advance the clock after creating the file to avoid a race when
           // setting its modification time
           clock.advance(batchDuration.milliseconds)
@@ -236,18 +231,85 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
         }
         // Over time, create files in the temp directory 1
         val input1 = Seq(1, 2, 3, 4, 5)
-        input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1))
+        input1.foreach(i => createFileAndAdvanceTime(i, testSubDir1))
 
         // Over time, create files in the temp directory 1
         val input2 = Seq(6, 7, 8, 9, 10)
-        input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2))
+        input2.foreach(i => createFileAndAdvanceTime(i, testSubDir2))
 
         // Verify that all the files have been read
         val expectedOutput = (input1 ++ input2).map(_.toString).toSet
         assert(outputQueue.asScala.flatten.toSet === expectedOutput)
       }
-    } finally {
-      if (testDir != null) Utils.deleteRecursively(testDir)
+    }
+  }
+
+  test("Modified files are correctly detected.") {
+    withTempDir { testDir =>
+      val batchDuration = Seconds(2)
+      val durationMs = batchDuration.milliseconds
+      val testPath = new Path(testDir.toURI)
+      val streamDir = new Path(testPath, "streaming")
+      val streamGlobPath = new Path(streamDir, "sub*")
+      val generatedDir = new Path(testPath, "generated")
+      val generatedSubDir = new Path(generatedDir, "subdir")
+      val renamedSubDir = new Path(streamDir, "subdir")
+
+      withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
+        val sparkContext = ssc.sparkContext
+        val hc = sparkContext.hadoopConfiguration
+        val fs = FileSystem.get(testPath.toUri, hc)
+
+        fs.delete(testPath, true)
+        fs.mkdirs(testPath)
+        fs.mkdirs(streamDir)
+        fs.mkdirs(generatedSubDir)
+
+        def write(path: Path, text: String): Unit = {
+          val out = fs.create(path, true)
+          IOUtils.write(text, out)
+          out.close()
+        }
+
+        val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+        val existingFile = new Path(generatedSubDir, "existing")
+        write(existingFile, "existing\n")
+        val status = fs.getFileStatus(existingFile)
+        clock.setTime(status.getModificationTime + durationMs)
+        val batchCounter = new BatchCounter(ssc)
+        val fileStream = ssc.textFileStream(streamGlobPath.toUri.toString)
+        val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+        val outputStream = new TestOutputStream(fileStream, outputQueue)
+        outputStream.register()
+
+        ssc.start()
+        clock.advance(durationMs)
+        eventually(eventuallyTimeout) {
+          assert(1 === batchCounter.getNumCompletedBatches)
+        }
+        // create and rename the file
+        // put a file into the generated directory
+        val textPath = new Path(generatedSubDir, "renamed.txt")
+        write(textPath, "renamed\n")
+        val now = clock.getTimeMillis()
+        val modTime = now + durationMs / 2
+        fs.setTimes(textPath, modTime, modTime)
+        val textFilestatus = fs.getFileStatus(existingFile)
+        assert(textFilestatus.getModificationTime < now + durationMs)
+
+        // rename the directory under the path being scanned
+        fs.rename(generatedSubDir, renamedSubDir)
+
+        // move forward one window
+        clock.advance(durationMs)
+        // await the next scan completing
+        eventually(eventuallyTimeout) {
+          assert(2 === batchCounter.getNumCompletedBatches)
+        }
+        // verify that the "renamed" file is found, but not the "existing" one 
which is out of
+        // the window
+        assert(Set("renamed") === outputQueue.asScala.flatten.toSet)
+      }
     }
   }
 
@@ -416,10 +478,8 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
   }
 
   def testFileStream(newFilesOnly: Boolean) {
-    var testDir: File = null
-    try {
+    withTempDir { testDir =>
       val batchDuration = Seconds(2)
-      testDir = Utils.createTempDir()
       // Create a file that exists before the StreamingContext is created:
       val existingFile = new File(testDir, "0")
       Files.write("0\n", existingFile, StandardCharsets.UTF_8)
@@ -466,8 +526,6 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
         }
         assert(outputQueue.asScala.flatten.toSet === expectedOutput)
       }
-    } finally {
-      if (testDir != null) Utils.deleteRecursively(testDir)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3ae4f07d/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index dbab708..ada494e 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming
 
-import java.io.{IOException, ObjectInputStream}
+import java.io.{File, IOException, ObjectInputStream}
 import java.util.concurrent.ConcurrentLinkedQueue
 
 import scala.collection.JavaConverters._
@@ -557,4 +557,16 @@ trait TestSuiteBase extends SparkFunSuite with 
BeforeAndAfter with Logging {
       verifyOutput[W](output.toSeq, expectedOutput, useSet)
     }
   }
+
+  /**
+   * Creates a temporary directory, which is then passed to `f` and will be 
deleted after `f`
+   * returns.
+   * (originally from `SqlTestUtils`.)
+   * @todo Probably this method should be moved to a more general place
+   */
+  protected def withTempDir(f: File => Unit): Unit = {
+    val dir = Utils.createTempDir().getCanonicalFile
+    try f(dir) finally Utils.deleteRecursively(dir)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to