This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 26a9ba6481c4 [SPARK-52609][SQL] Make sure row sorter from each task 
won't be overwritten by other tasks in parallel
26a9ba6481c4 is described below

commit 26a9ba6481c4700a6ffbb83e82cde6d49d8160da
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Mon Jun 30 07:19:12 2025 -0700

    [SPARK-52609][SQL] Make sure row sorter from each task won't be overwritten 
by other tasks in parallel
    
    ### What changes were proposed in this pull request?
    
    This moves the row sorter inside `SortExec` to a thread local variable, to 
make sure the row sorter from each task won't be overwritten by other tasks.
    
    ### Why are the changes needed?
    
    By working some POCs, we found that indeterministically strange errors 
happened, like SIGSEV, from the external sorter class. We figured that the 
usage of it in `SortExec` is not thread-safe.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. It is internal to `SortExec`.
    
    ### How was this patch tested?
    
    Manually test. Existing test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #51315 from viirya/fix_sorter.
    
    Authored-by: Liang-Chi Hsieh <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../scala/org/apache/spark/sql/execution/SortExec.scala | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
index 5abc6f3ed576..11fde41aae9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
@@ -62,7 +62,9 @@ case class SortExec(
     "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
     "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))
 
-  private[sql] var rowSorter: UnsafeExternalRowSorter = _
+  // Each task has its own instance of UnsafeExternalRowSorter. It is created 
in the
+  // createSorter method and stored in a ThreadLocal variable.
+  private[sql] var rowSorter: ThreadLocal[UnsafeExternalRowSorter] = _
 
   /**
    * This method gets invoked only once for each SortExec instance to 
initialize an
@@ -71,6 +73,8 @@ case class SortExec(
    * should make it public.
    */
   def createSorter(): UnsafeExternalRowSorter = {
+    rowSorter = new ThreadLocal[UnsafeExternalRowSorter]()
+
     val ordering = RowOrdering.create(sortOrder, output)
 
     // The comparator for comparing prefix
@@ -95,13 +99,14 @@ case class SortExec(
     }
 
     val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
-    rowSorter = UnsafeExternalRowSorter.create(
+    val newRowSorter = UnsafeExternalRowSorter.create(
       schema, ordering, prefixComparator, prefixComputer, pageSize, 
canUseRadixSort)
 
     if (testSpillFrequency > 0) {
-      rowSorter.setTestSpillFrequency(testSpillFrequency)
+      newRowSorter.setTestSpillFrequency(testSpillFrequency)
     }
-    rowSorter
+    rowSorter.set(newRowSorter)
+    rowSorter.get()
   }
 
   protected override def doExecute(): RDD[InternalRow] = {
@@ -194,11 +199,11 @@ case class SortExec(
    * In SortExec, we overwrites cleanupResources to close 
UnsafeExternalRowSorter.
    */
   override protected[sql] def cleanupResources(): Unit = {
-    if (rowSorter != null) {
+    if (rowSorter != null && rowSorter.get() != null) {
       // There's possible for rowSorter is null here, for example, in the 
scenario of empty
       // iterator in the current task, the downstream physical node(like 
SortMergeJoinExec) will
       // trigger cleanupResources before rowSorter initialized in createSorter.
-      rowSorter.cleanupResources()
+      rowSorter.get().cleanupResources()
     }
     super.cleanupResources()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to