This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 02f7fb333977 [SPARK-52339][SQL][3.5] Fix comparison of `InMemoryFileIndex` instances 02f7fb333977 is described below commit 02f7fb33397798d72c71b48e320bfe5d876899be Author: Bruce Robbins <bersprock...@gmail.com> AuthorDate: Tue Jun 24 14:03:59 2025 +0800 [SPARK-52339][SQL][3.5] Fix comparison of `InMemoryFileIndex` instances ### What changes were proposed in this pull request? This is a back-port of #51043. This PR changes `InMemoryFileIndex#equals` to compare a non-distinct collection of root paths rather than a distinct set of root paths. Without this change, `InMemoryFileIndex#equals` considers the following two collections of root paths to be equal, even though they represent a different number of rows: ``` ["/tmp/test", "/tmp/test"] ["/tmp/test", "/tmp/test", "/tmp/test"] ``` ### Why are the changes needed? The bug can cause correctness issues, e.g. ``` // create test data val data = Seq((1, 2), (2, 3)).toDF("a", "b") data.write.mode("overwrite").csv("/tmp/test") val fileList1 = List.fill(2)("/tmp/test") val fileList2 = List.fill(3)("/tmp/test") val df1 = spark.read.schema("a int, b int").csv(fileList1: _*) val df2 = spark.read.schema("a int, b int").csv(fileList2: _*) df1.count() // correctly returns 4 df2.count() // correctly returns 6 // the following is the same as above, except df1 is persisted val df1 = spark.read.schema("a int, b int").csv(fileList1: _*).persist val df2 = spark.read.schema("a int, b int").csv(fileList2: _*) df1.count() // correctly returns 4 df2.count() // incorrectly returns 4!! ``` In the above example, df1 and df2 were created with a different number of paths: df1 has 2, and df2 has 3. But since the distinct set of root paths is the same (e.g., `Set("/tmp/test") == Set("/tmp/test"))`, the two dataframes are considered equal. Thus, when df1 is persisted, df2 uses df1's cached plan. The same bug also causes inappropriate exchange reuse. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51256 from bersprockets/multi_path_issue_br35. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Kent Yao <y...@apache.org> --- .../execution/datasources/InMemoryFileIndex.scala | 2 +- .../sql/execution/datasources/FileIndexSuite.scala | 24 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 44d31131e9c6..8920ff88be51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -101,7 +101,7 @@ class InMemoryFileIndex( } override def equals(other: Any): Boolean = other match { - case hdfs: InMemoryFileIndex => rootPaths.toSet == hdfs.rootPaths.toSet + case hdfs: InMemoryFileIndex => rootPaths.sorted == hdfs.rootPaths.sorted case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 9ac61f0cee5f..54403ea99c81 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -554,6 +554,30 @@ class FileIndexSuite extends SharedSparkSession { assert(FileIndexOptions.isValidOption("modifiedafter")) assert(FileIndexOptions.isValidOption("pathglobfilter")) } + + test("SPARK-52339: Correctly compare root paths") { + withTempDir { dir => + val file1 = new File(dir, "text1.txt") + stringToFile(file1, "text1") + val file2 = new File(dir, "text2.txt") + stringToFile(file2, "text2") + val path1 = new Path(file1.getCanonicalPath) + val path2 = new Path(file2.getCanonicalPath) + + val schema = StructType(Seq(StructField("a", StringType, false))) + + // Verify that the order of paths doesn't matter + val fileIndex1a = new InMemoryFileIndex(spark, Seq(path1, path2), Map.empty, Some(schema)) + val fileIndex1b = new InMemoryFileIndex(spark, Seq(path2, path1), Map.empty, Some(schema)) + assert(fileIndex1a == fileIndex1b) + + // Verify that a different number of paths does matter + val fileIndex2a = new InMemoryFileIndex(spark, Seq(path1, path1), Map.empty, Some(schema)) + val fileIndex2b = new InMemoryFileIndex(spark, Seq(path1, path1, path1), + Map.empty, Some(schema)) + assert(fileIndex2a != fileIndex2b) + } + } } object DeletionRaceFileSystem { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org