This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 02f7fb333977 [SPARK-52339][SQL][3.5] Fix comparison of 
`InMemoryFileIndex` instances
02f7fb333977 is described below

commit 02f7fb33397798d72c71b48e320bfe5d876899be
Author: Bruce Robbins <bersprock...@gmail.com>
AuthorDate: Tue Jun 24 14:03:59 2025 +0800

    [SPARK-52339][SQL][3.5] Fix comparison of `InMemoryFileIndex` instances
    
    ### What changes were proposed in this pull request?
    
    This is a back-port of #51043.
    
    This PR changes `InMemoryFileIndex#equals` to compare a non-distinct 
collection of root paths rather than a distinct set of root paths. Without this 
change, `InMemoryFileIndex#equals` considers the following two collections of 
root paths to be equal, even though they represent a different number of rows:
    ```
    ["/tmp/test", "/tmp/test"]
    ["/tmp/test", "/tmp/test", "/tmp/test"]
    ```
    
    ### Why are the changes needed?
    
    The bug can cause correctness issues, e.g.
    ```
    // create test data
    val data = Seq((1, 2), (2, 3)).toDF("a", "b")
    data.write.mode("overwrite").csv("/tmp/test")
    
    val fileList1 = List.fill(2)("/tmp/test")
    val fileList2 = List.fill(3)("/tmp/test")
    
    val df1 = spark.read.schema("a int, b int").csv(fileList1: _*)
    val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)
    
    df1.count() // correctly returns 4
    df2.count() // correctly returns 6
    
    // the following is the same as above, except df1 is persisted
    val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist
    val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)
    
    df1.count() // correctly returns 4
    df2.count() // incorrectly returns 4!!
    ```
    In the above example, df1 and df2 were created with a different number of 
paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is 
the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are 
considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan.
    
    The same bug also causes inappropriate exchange reuse.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51256 from bersprockets/multi_path_issue_br35.
    
    Authored-by: Bruce Robbins <bersprock...@gmail.com>
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../execution/datasources/InMemoryFileIndex.scala  |  2 +-
 .../sql/execution/datasources/FileIndexSuite.scala | 24 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 44d31131e9c6..8920ff88be51 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -101,7 +101,7 @@ class InMemoryFileIndex(
   }
 
   override def equals(other: Any): Boolean = other match {
-    case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+    case hdfs: InMemoryFileIndex => rootPaths.sorted == hdfs.rootPaths.sorted
     case _ => false
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 9ac61f0cee5f..54403ea99c81 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -554,6 +554,30 @@ class FileIndexSuite extends SharedSparkSession {
     assert(FileIndexOptions.isValidOption("modifiedafter"))
     assert(FileIndexOptions.isValidOption("pathglobfilter"))
   }
+
+  test("SPARK-52339: Correctly compare root paths") {
+    withTempDir { dir =>
+      val file1 = new File(dir, "text1.txt")
+      stringToFile(file1, "text1")
+      val file2 = new File(dir, "text2.txt")
+      stringToFile(file2, "text2")
+      val path1 = new Path(file1.getCanonicalPath)
+      val path2 = new Path(file2.getCanonicalPath)
+
+      val schema = StructType(Seq(StructField("a", StringType, false)))
+
+      // Verify that the order of paths doesn't matter
+      val fileIndex1a = new InMemoryFileIndex(spark, Seq(path1, path2), 
Map.empty, Some(schema))
+      val fileIndex1b = new InMemoryFileIndex(spark, Seq(path2, path1), 
Map.empty, Some(schema))
+      assert(fileIndex1a == fileIndex1b)
+
+      // Verify that a different number of paths does matter
+      val fileIndex2a = new InMemoryFileIndex(spark, Seq(path1, path1), 
Map.empty, Some(schema))
+      val fileIndex2b = new InMemoryFileIndex(spark, Seq(path1, path1, path1),
+        Map.empty, Some(schema))
+      assert(fileIndex2a != fileIndex2b)
+    }
+  }
 }
 
 object DeletionRaceFileSystem {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to