This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e702b32656bc [SPARK-48314][SS] Don't double cache files for 
FileStreamSource using Trigger.AvailableNow
e702b32656bc is described below

commit e702b32656bcbe194be19876990954a4be457734
Author: Adam Binford <adam...@gmail.com>
AuthorDate: Wed May 22 10:58:02 2024 +0900

    [SPARK-48314][SS] Don't double cache files for FileStreamSource using 
Trigger.AvailableNow
    
    ### What changes were proposed in this pull request?
    
    Files don't need to be cached for reuse in `FileStreamSource` when using 
`Trigger.AvailableNow` because all files are already cached for the lifetime of 
the query in `allFilesForTriggerAvailableNow`.
    
    ### Why are the changes needed?
    
    As reported in https://issues.apache.org/jira/browse/SPARK-44924 (with a PR 
to address https://github.com/apache/spark/pull/45362), the hard coded cap of 
10k files being cached can cause problems when using a maxFilesPerTrigger > 
10k. It causes every other batch to be 10k files, which can greatly limit the 
throughput of a new streaming trying to catch up.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Every other streaming batch won't be 10k files if using 
Trigger.AvailableNow and maxFilesPerTrigger greater than 10k.
    
    ### How was this patch tested?
    
    New UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #46627 from Kimahriman/available-now-no-cache.
    
    Authored-by: Adam Binford <adam...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/execution/streaming/FileStreamSource.scala | 10 +++--
 .../sql/streaming/FileStreamSourceSuite.scala      | 45 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 373a122e0001..4a9b2d11b7e0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -184,9 +184,11 @@ class FileStreamSource(
       }
     }
 
+    val shouldCache = !sourceOptions.latestFirst && 
allFilesForTriggerAvailableNow == null
+
     // Obey user's setting to limit the number of files in this batch trigger.
     val (batchFiles, unselectedFiles) = limit match {
-      case files: ReadMaxFiles if !sourceOptions.latestFirst =>
+      case files: ReadMaxFiles if shouldCache =>
         // we can cache and reuse remaining fetched list of files in further 
batches
         val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles())
         if (usFiles.size < files.maxFiles() * discardCachedInputRatio) {
@@ -200,10 +202,10 @@ class FileStreamSource(
         }
 
       case files: ReadMaxFiles =>
-        // implies "sourceOptions.latestFirst = true" which we want to refresh 
the list per batch
+        // don't use the cache, just take files for the next batch
         (newFiles.take(files.maxFiles()), null)
 
-      case files: ReadMaxBytes if !sourceOptions.latestFirst =>
+      case files: ReadMaxBytes if shouldCache =>
         // we can cache and reuse remaining fetched list of files in further 
batches
         val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
           takeFilesUntilMax(newFiles, files.maxBytes())
@@ -218,8 +220,8 @@ class FileStreamSource(
         }
 
       case files: ReadMaxBytes =>
+        // don't use the cache, just take files for the next batch
         val (FilesSplit(bFiles, _), _) = takeFilesUntilMax(newFiles, 
files.maxBytes())
-        // implies "sourceOptions.latestFirst = true" which we want to refresh 
the list per batch
         (bFiles, null)
 
       case _: ReadAllAvailable => (newFiles, null)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index ff3cc5c247df..ca4f2a7f26ce 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -2448,6 +2448,51 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
{
     }
   }
 
+  test("SPARK-48314: Don't cache unread files when using 
Trigger.AvailableNow") {
+    withCountListingLocalFileSystemAsLocalFileSystem {
+      withThreeTempDirs { case (src, meta, tmp) =>
+        val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> 
"5",
+          "maxCachedFiles" -> "2")
+        val scheme = CountListingLocalFileSystem.scheme
+        val source = new FileStreamSource(spark, 
s"$scheme:///${src.getCanonicalPath}/*/*", "text",
+          StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
+        val _metadataLog = 
PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
+        val metadataLog = source invokePrivate _metadataLog()
+
+        // provide 20 files in src, with sequential "last modified" to 
guarantee ordering
+        (0 to 19).map { idx =>
+            val f = createFile(idx.toString, new File(src, idx.toString), tmp)
+          f.setLastModified(idx * 10000)
+          f
+        }
+
+        source.prepareForTriggerAvailableNow()
+        CountListingLocalFileSystem.resetCount()
+
+        var offset = source.latestOffset(FileStreamSourceOffset(-1L), 
ReadLimit.maxFiles(5))
+          .asInstanceOf[FileStreamSourceOffset]
+        var files = 
metadataLog.get(offset.logOffset).getOrElse(Array.empty[FileEntry])
+
+        // All files are already tracked in allFilesForTriggerAvailableNow
+        assert(0 === CountListingLocalFileSystem.pathToNumListStatusCalled
+          .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+        // Should be 5 files in the batch based on maxFiles limit
+        assert(files.length == 5)
+
+        // Reading again leverages the files already tracked in 
allFilesForTriggerAvailableNow,
+        // so no more listings need to happen
+        offset = source.latestOffset(FileStreamSourceOffset(-1L), 
ReadLimit.maxFiles(5))
+          .asInstanceOf[FileStreamSourceOffset]
+        files = 
metadataLog.get(offset.logOffset).getOrElse(Array.empty[FileEntry])
+
+        assert(0 === CountListingLocalFileSystem.pathToNumListStatusCalled
+          .get(src.getCanonicalPath).map(_.get()).getOrElse(0))
+        // Should be 5 files in the batch since cached files are ignored
+        assert(files.length == 5)
+      }
+    }
+  }
+
   test("SPARK-31962: file stream source shouldn't allow 
modifiedBefore/modifiedAfter") {
     def formatTime(time: LocalDateTime): String = {
       time.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to