This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8430dbfd9482 [SPARK-53961][SQL][TESTS] Fix `FileStreamSinkSuite` 
flakiness by using `walkFileTree` instead of `walk`
8430dbfd9482 is described below

commit 8430dbfd948212c5b86703e9bce0ab2013ef5d01
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Mon Oct 20 20:25:18 2025 -0700

    [SPARK-53961][SQL][TESTS] Fix `FileStreamSinkSuite` flakiness by using 
`walkFileTree` instead of `walk`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to fix `FileStreamSinkSuite` flakiness by using `walkFileTree` 
instead of `walk`.
    
    ### Why are the changes needed?
    
    `Files.walk` is flaky like the following when the directory has a race 
condition. `walkFileTree` has more robust error handling.
    
    
https://github.com/apache/spark/blob/2bb73fbdeb19f0a972786d3ea33d3263bf84ab66/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala#L543-L547
    
    ```
    [info] - cleanup complete but invalid output for aborted job *** FAILED *** 
(438 milliseconds)
         [info]   java.io.UncheckedIOException: 
java.nio.file.NoSuchFileException: 
***/spark-4c7ad10b-5848-45d7-ba43-dae4020ad011/output 
#output/part-00007-e582f3e3-87e3-40fa-8269-7fac9b545775-c000.snappy.parquet
         [info]   at 
java.base/java.nio.file.FileTreeIterator.fetchNextIfNeeded(FileTreeIterator.java:87)
         [info]   at 
java.base/java.nio.file.FileTreeIterator.hasNext(FileTreeIterator.java:103)
         [info]   at 
java.base/java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1855)
         [info]   at 
java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:292)
         [info]   at 
java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
         [info]   at 
java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:169)
         [info]   at 
java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:298)
         [info]   at 
java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681)
         [info]   at 
scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:46)
         [info]   at 
scala.collection.Iterator$$anon$6.hasNext(Iterator.scala:480)
         [info]   at 
scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
         [info]   at scala.collection.mutable.Growable.addAll(Growable.scala:61)
         [info]   at 
scala.collection.mutable.Growable.addAll$(Growable.scala:57)
         [info]   at 
scala.collection.immutable.SetBuilderImpl.addAll(Set.scala:405)
         [info]   at scala.collection.immutable.Set$.from(Set.scala:362)
         [info]   at 
scala.collection.IterableOnceOps.toSet(IterableOnce.scala:1469)
         [info]   at 
scala.collection.IterableOnceOps.toSet$(IterableOnce.scala:1469)
         [info]   at 
scala.collection.AbstractIterator.toSet(Iterator.scala:1306)
         [info]   at 
org.apache.spark.sql.streaming.FileStreamSinkSuite.$anonfun$new$52(FileStreamSinkSuite.scala:537)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is a test case change.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52671 from dongjoon-hyun/SPARK-53961.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../spark/sql/streaming/FileStreamSinkSuite.scala  | 24 ++++++++++++++++++----
 1 file changed, 20 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index b0aa71a7e1b3..4c06a0109e34 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming
 
 import java.io.{File, IOException}
 import java.nio.file.{Files, Paths}
+import java.nio.file.attribute.BasicFileAttributes
 import java.util.Locale
 
 import scala.collection.mutable.ArrayBuffer
@@ -534,10 +535,25 @@ abstract class FileStreamSinkSuite extends StreamTest {
         }
 
         import PendingCommitFilesTrackingManifestFileCommitProtocol._
-        val outputFileNames = Files.walk(outputDir.toPath).iterator().asScala
-          .filter(_.toString.endsWith(".parquet"))
-          .map(_.getFileName.toString)
-          .toSet
+        import java.nio.file.{Path, _}
+        val outputFileNames = scala.collection.mutable.Set.empty[String]
+        Files.walkFileTree(
+          outputDir.toPath,
+          new SimpleFileVisitor[Path] {
+            override def visitFile(file: Path, attrs: BasicFileAttributes): 
FileVisitResult = {
+              val fileName = file.getFileName.toString
+              if (fileName.endsWith(".parquet")) outputFileNames += fileName
+              FileVisitResult.CONTINUE
+            }
+            override def visitFileFailed(file: Path, exc: IOException): 
FileVisitResult = {
+              exc match {
+                case _: NoSuchFileException =>
+                  FileVisitResult.CONTINUE
+                case _ =>
+                  FileVisitResult.TERMINATE
+              }
+            }
+          })
         val trackingFileNames = 
tracking.map(SparkPath.fromUrlString(_).toPath.getName).toSet
 
         // there would be possible to have race condition:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to