This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ccded6c21b28 [SPARK-52339][SQL] Fix comparison of `InMemoryFileIndex`
instances
ccded6c21b28 is described below
commit ccded6c21b28e6579df93b7f25a6c00c056f3d09
Author: Bruce Robbins <[email protected]>
AuthorDate: Mon Jun 23 14:21:00 2025 +0800
[SPARK-52339][SQL] Fix comparison of `InMemoryFileIndex` instances
### What changes were proposed in this pull request?
This PR changes `InMemoryFileIndex#equals` to compare a non-distinct
collection of root paths rather than a distinct set of root paths. Without this
change, `InMemoryFileIndex#equals` considers the following two collections of
root paths to be equal, even though they represent a different number of rows:
```
["/tmp/test", "/tmp/test"]
["/tmp/test", "/tmp/test", "/tmp/test"]
```
### Why are the changes needed?
The bug can cause correctness issues, e.g.
```
// create test data
val data = Seq((1, 2), (2, 3)).toDF("a", "b")
data.write.mode("overwrite").csv("/tmp/test")
val fileList1 = List.fill(2)("/tmp/test")
val fileList2 = List.fill(3)("/tmp/test")
val df1 = spark.read.schema("a int, b int").csv(fileList1: _*)
val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)
df1.count() // correctly returns 4
df2.count() // correctly returns 6
// the following is the same as above, except df1 is persisted
val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist
val df2 = spark.read.schema("a int, b int").csv(fileList2: _*)
df1.count() // correctly returns 4
df2.count() // incorrectly returns 4!!
```
In the above example, df1 and df2 were created with a different number of
paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is
the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are
considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan.
The same bug also causes inappropriate exchange reuse.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #51043 from bersprockets/multi_path_issue.
Authored-by: Bruce Robbins <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/datasources/InMemoryFileIndex.scala | 2 +-
.../sql/execution/datasources/FileIndexSuite.scala | 24 ++++++++++++++++++++++
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
index 3b8a20c7cf74..ec258e2e4645 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala
@@ -103,7 +103,7 @@ class InMemoryFileIndex(
}
override def equals(other: Any): Boolean = other match {
- case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet
+ case hdfs: InMemoryFileIndex => rootPaths.sorted == hdfs.rootPaths.sorted
case _ => false
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
index 33b4cc1d2e7f..1150f6163b97 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala
@@ -656,6 +656,30 @@ class FileIndexSuite extends SharedSparkSession {
assert(FileIndexOptions.isValidOption("modifiedafter"))
assert(FileIndexOptions.isValidOption("pathglobfilter"))
}
+
+ test("SPARK-52339: Correctly compare root paths") {
+ withTempDir { dir =>
+ val file1 = new File(dir, "text1.txt")
+ stringToFile(file1, "text1")
+ val file2 = new File(dir, "text2.txt")
+ stringToFile(file2, "text2")
+ val path1 = new Path(file1.getCanonicalPath)
+ val path2 = new Path(file2.getCanonicalPath)
+
+ val schema = StructType(Seq(StructField("a", StringType, false)))
+
+ // Verify that the order of paths doesn't matter
+ val fileIndex1a = new InMemoryFileIndex(spark, Seq(path1, path2),
Map.empty, Some(schema))
+ val fileIndex1b = new InMemoryFileIndex(spark, Seq(path2, path1),
Map.empty, Some(schema))
+ assert(fileIndex1a == fileIndex1b)
+
+ // Verify that a different number of paths does matter
+ val fileIndex2a = new InMemoryFileIndex(spark, Seq(path1, path1),
Map.empty, Some(schema))
+ val fileIndex2b = new InMemoryFileIndex(spark, Seq(path1, path1, path1),
+ Map.empty, Some(schema))
+ assert(fileIndex2a != fileIndex2b)
+ }
+ }
}
object DeletionRaceFileSystem {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]