This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cc787d6ce3b [SPARK-55505][SQL] Fix NPE on reading 
EXECUTION_ROOT_ID_KEY in concurrent scenarios
4cc787d6ce3b is described below

commit 4cc787d6ce3b5d8760cb1320b08c26b3d1ea372d
Author: Yicong Huang <[email protected]>
AuthorDate: Thu Feb 19 09:07:04 2026 +0900

    [SPARK-55505][SQL] Fix NPE on reading EXECUTION_ROOT_ID_KEY in concurrent 
scenarios
    
    ### What changes were proposed in this pull request?
    
    Two fixes:
    
    1. **`SQLExecution.withNewExecutionId0`**: Avoid redundant re-read of 
`EXECUTION_ROOT_ID_KEY` from local properties — reuse the value already 
available.
    
    2. **`AsyncRDDActions.takeAsync`**: Add missing `Utils.cloneProperties` 
call when propagating local properties to thread pool threads, consistent with 
`withThreadLocalCaptured`, `DAGScheduler`, `JobScheduler`, and `SparkOperation`.
    
    ### Why are the changes needed?
    
    The `AsyncRDDActions.takeAsync` bug is a long-standing issue since 2015 
(#9264). It passes the raw `Properties` reference (via `getLocalProperties`) to 
a thread pool thread without cloning. This causes two threads to share the same 
`Properties` instance, breaking thread-local isolation. It was not exposed 
until recently gaogaotiantian changed the test to be parallel in #53725.
    
    When one thread clears `EXECUTION_ROOT_ID_KEY` in its finally block (line 
267), the other thread sees `null` on re-read (line 115), causing 
`NumberFormatException: Cannot parse null string`.
    
    CI failure: 
https://github.com/Yicong-Huang/spark/actions/runs/21961599500/attempts/1
    (Attempt 1 failed, attempt 2 passed. Failed test: 
`test_save_load_pipeline_estimator` in `CrossValidatorIOPipelineTests`)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests. The flaky test `test_save_load_pipeline_estimator` in 
`CrossValidatorIOPipelineTests` exercises the concurrent scenario that triggers 
this bug.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #54291 from Yicong-Huang/SPARK-55505/fix/execution-root-id-npe.
    
    Authored-by: Yicong Huang <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala     | 4 ++--
 .../main/scala/org/apache/spark/sql/execution/SQLExecution.scala   | 7 +++++--
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala 
b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index 2d72b4dd6bf2..d18ce9e7fc52 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -26,8 +26,8 @@ import scala.reflect.ClassTag
 import org.apache.spark.{ComplexFutureAction, FutureAction, JobSubmitter}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{RDD_LIMIT_INITIAL_NUM_PARTITIONS, 
RDD_LIMIT_SCALE_UP_FACTOR}
+import org.apache.spark.util.{ThreadUtils, Utils}
 import org.apache.spark.util.ArrayImplicits._
-import org.apache.spark.util.ThreadUtils
 
 /**
  * A set of asynchronous RDD actions available through an implicit conversion.
@@ -68,7 +68,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends 
Serializable with Loggi
    */
   def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope {
     val callSite = self.context.getCallSite()
-    val localProperties = self.context.getLocalProperties
+    val localProperties = 
Utils.cloneProperties(self.context.getLocalProperties)
     // Cached thread pool to handle aggregation of subtasks.
     implicit val executionContext = AsyncRDDActions.futureExecutionContext
     val results = new ArrayBuffer[T]
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index cf26b2991652..96a0053f97b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -108,11 +108,14 @@ object SQLExecution extends Logging {
     // Track the "root" SQL Execution Id for nested/sub queries. The current 
execution is the
     // root execution if the root execution ID is null.
     // And for the root execution, rootExecutionId == executionId.
-    if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == null) {
+    val existingRootId = sc.getLocalProperty(EXECUTION_ROOT_ID_KEY)
+    val rootExecutionId = if (existingRootId != null) {
+      existingRootId.toLong
+    } else {
       sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, executionId.toString)
       sc.addJobTag(executionIdJobTag(sparkSession, executionId))
+      executionId
     }
-    val rootExecutionId = sc.getLocalProperty(EXECUTION_ROOT_ID_KEY).toLong
     executionIdToQueryExecution.put(executionId, queryExecution)
     val originalInterruptOnCancel = 
sc.getLocalProperty(SPARK_JOB_INTERRUPT_ON_CANCEL)
     if (originalInterruptOnCancel == null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to