Repository: spark
Updated Branches:
  refs/heads/branch-2.0 54d4eee51 -> d3f90e71a


[SPARK-17640][SQL] Avoid using -1 as the default batchId for 
FileStreamSource.FileEntry

## What changes were proposed in this pull request?

Avoid using -1 as the default batchId for FileStreamSource.FileEntry so that we 
can make sure not writing any FileEntry(..., batchId = -1) into the log. This 
also avoids people misusing it in future (#15203 is an example).

## How was this patch tested?

Jenkins.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #15206 from zsxwing/cleanup.

(cherry picked from commit 62ccf27ab4b55e734646678ae78b7e812262d14b)
Signed-off-by: Shixiong Zhu <shixi...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3f90e71
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3f90e71
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3f90e71

Branch: refs/heads/branch-2.0
Commit: d3f90e71af57162afc0648adbc52b810a883ceac
Parents: 54d4eee
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Sep 22 23:35:08 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Thu Sep 22 23:35:15 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/FileStreamSource.scala  | 37 ++++++++++----------
 .../streaming/FileStreamSourceSuite.scala       | 24 ++++++-------
 2 files changed, 31 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d3f90e71/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 4515f9a..8c3e718 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -59,7 +59,7 @@ class FileStreamSource(
   val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
 
   metadataLog.allFiles().foreach { entry =>
-    seenFiles.add(entry)
+    seenFiles.add(entry.path, entry.timestamp)
   }
   seenFiles.purge()
 
@@ -73,14 +73,16 @@ class FileStreamSource(
    */
   private def fetchMaxOffset(): LongOffset = synchronized {
     // All the new files found - ignore aged files and files that we have seen.
-    val newFiles = fetchAllFiles().filter(seenFiles.isNewFile)
+    val newFiles = fetchAllFiles().filter {
+      case (path, timestamp) => seenFiles.isNewFile(path, timestamp)
+    }
 
     // Obey user's setting to limit the number of files in this batch trigger.
     val batchFiles =
       if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else 
newFiles
 
     batchFiles.foreach { file =>
-      seenFiles.add(file)
+      seenFiles.add(file._1, file._2)
       logDebug(s"New file: $file")
     }
     val numPurged = seenFiles.purge()
@@ -95,7 +97,9 @@ class FileStreamSource(
 
     if (batchFiles.nonEmpty) {
       maxBatchId += 1
-      metadataLog.add(maxBatchId, batchFiles.map(_.copy(batchId = 
maxBatchId)).toArray)
+      metadataLog.add(maxBatchId, batchFiles.map { case (path, timestamp) =>
+        FileEntry(path = path, timestamp = timestamp, batchId = maxBatchId)
+      }.toArray)
       logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} 
new files")
     }
 
@@ -140,12 +144,12 @@ class FileStreamSource(
   /**
    * Returns a list of files found, sorted by their timestamp.
    */
-  private def fetchAllFiles(): Seq[FileEntry] = {
+  private def fetchAllFiles(): Seq[(String, Long)] = {
     val startTime = System.nanoTime
     val globbedPaths = 
SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
     val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, 
Some(new StructType))
     val files = catalog.allFiles().sortBy(_.getModificationTime).map { status 
=>
-      FileEntry(status.getPath.toUri.toString, status.getModificationTime)
+      (status.getPath.toUri.toString, status.getModificationTime)
     }
     val endTime = System.nanoTime
     val listingTimeMs = (endTime.toDouble - startTime) / 1000000
@@ -172,10 +176,7 @@ object FileStreamSource {
   /** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
   type Timestamp = Long
 
-  val NOT_SET = -1L
-
-  case class FileEntry(path: String, timestamp: Timestamp, batchId: Long = 
NOT_SET)
-    extends Serializable
+  case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) 
extends Serializable
 
   /**
    * A custom hash map used to track the list of files seen. This map is not 
thread-safe.
@@ -196,10 +197,10 @@ object FileStreamSource {
     private var lastPurgeTimestamp: Timestamp = 0L
 
     /** Add a new file to the map. */
-    def add(file: FileEntry): Unit = {
-      map.put(file.path, file.timestamp)
-      if (file.timestamp > latestTimestamp) {
-        latestTimestamp = file.timestamp
+    def add(path: String, timestamp: Timestamp): Unit = {
+      map.put(path, timestamp)
+      if (timestamp > latestTimestamp) {
+        latestTimestamp = timestamp
       }
     }
 
@@ -207,10 +208,10 @@ object FileStreamSource {
      * Returns true if we should consider this file a new file. The file is 
only considered "new"
      * if it is new enough that we are still tracking, and we have not seen it 
before.
      */
-    def isNewFile(file: FileEntry): Boolean = {
+    def isNewFile(path: String, timestamp: Timestamp): Boolean = {
       // Note that we are testing against lastPurgeTimestamp here so we'd 
never miss a file that
       // is older than (latestTimestamp - maxAgeMs) but has not been purged 
yet.
-      file.timestamp >= lastPurgeTimestamp && !map.containsKey(file.path)
+      timestamp >= lastPurgeTimestamp && !map.containsKey(path)
     }
 
     /** Removes aged entries and returns the number of files removed. */
@@ -230,8 +231,8 @@ object FileStreamSource {
 
     def size: Int = map.size()
 
-    def allEntries: Seq[FileEntry] = {
-      map.entrySet().asScala.map(entry => FileEntry(entry.getKey, 
entry.getValue)).toSeq
+    def allEntries: Seq[(String, Timestamp)] = {
+      map.asScala.toSeq
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d3f90e71/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
index dfe4bb8..1793db0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
@@ -36,51 +36,51 @@ class FileStreamSourceSuite extends SparkFunSuite with 
SharedSQLContext {
   test("SeenFilesMap") {
     val map = new SeenFilesMap(maxAgeMs = 10)
 
-    map.add(FileEntry("a", 5))
+    map.add("a", 5)
     assert(map.size == 1)
     map.purge()
     assert(map.size == 1)
 
     // Add a new entry and purge should be no-op, since the gap is exactly 10 
ms.
-    map.add(FileEntry("b", 15))
+    map.add("b", 15)
     assert(map.size == 2)
     map.purge()
     assert(map.size == 2)
 
     // Add a new entry that's more than 10 ms than the first entry. We should 
be able to purge now.
-    map.add(FileEntry("c", 16))
+    map.add("c", 16)
     assert(map.size == 3)
     map.purge()
     assert(map.size == 2)
 
     // Override existing entry shouldn't change the size
-    map.add(FileEntry("c", 25))
+    map.add("c", 25)
     assert(map.size == 2)
 
     // Not a new file because we have seen c before
-    assert(!map.isNewFile(FileEntry("c", 20)))
+    assert(!map.isNewFile("c", 20))
 
     // Not a new file because timestamp is too old
-    assert(!map.isNewFile(FileEntry("d", 5)))
+    assert(!map.isNewFile("d", 5))
 
     // Finally a new file: never seen and not too old
-    assert(map.isNewFile(FileEntry("e", 20)))
+    assert(map.isNewFile("e", 20))
   }
 
   test("SeenFilesMap should only consider a file old if it is earlier than 
last purge time") {
     val map = new SeenFilesMap(maxAgeMs = 10)
 
-    map.add(FileEntry("a", 20))
+    map.add("a", 20)
     assert(map.size == 1)
 
     // Timestamp 5 should still considered a new file because purge time 
should be 0
-    assert(map.isNewFile(FileEntry("b", 9)))
-    assert(map.isNewFile(FileEntry("b", 10)))
+    assert(map.isNewFile("b", 9))
+    assert(map.isNewFile("b", 10))
 
     // Once purge, purge time should be 10 and then b would be a old file if 
it is less than 10.
     map.purge()
-    assert(!map.isNewFile(FileEntry("b", 9)))
-    assert(map.isNewFile(FileEntry("b", 10)))
+    assert(!map.isNewFile("b", 9))
+    assert(map.isNewFile("b", 10))
   }
 
   testWithUninterruptibleThread("do not recheck that files exist during 
getBatch") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to