This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4cc787d6ce3b [SPARK-55505][SQL] Fix NPE on reading
EXECUTION_ROOT_ID_KEY in concurrent scenarios
4cc787d6ce3b is described below
commit 4cc787d6ce3b5d8760cb1320b08c26b3d1ea372d
Author: Yicong Huang <[email protected]>
AuthorDate: Thu Feb 19 09:07:04 2026 +0900
[SPARK-55505][SQL] Fix NPE on reading EXECUTION_ROOT_ID_KEY in concurrent
scenarios
### What changes were proposed in this pull request?
Two fixes:
1. **`SQLExecution.withNewExecutionId0`**: Avoid redundant re-read of
`EXECUTION_ROOT_ID_KEY` from local properties — reuse the value already
available.
2. **`AsyncRDDActions.takeAsync`**: Add missing `Utils.cloneProperties`
call when propagating local properties to thread pool threads, consistent with
`withThreadLocalCaptured`, `DAGScheduler`, `JobScheduler`, and `SparkOperation`.
### Why are the changes needed?
The `AsyncRDDActions.takeAsync` bug is a long-standing issue since 2015
(#9264). It passes the raw `Properties` reference (via `getLocalProperties`) to
a thread pool thread without cloning. This causes two threads to share the same
`Properties` instance, breaking thread-local isolation. It was not exposed
until recently gaogaotiantian changed the test to be parallel in #53725.
When one thread clears `EXECUTION_ROOT_ID_KEY` in its finally block (line
267), the other thread sees `null` on re-read (line 115), causing
`NumberFormatException: Cannot parse null string`.
CI failure:
https://github.com/Yicong-Huang/spark/actions/runs/21961599500/attempts/1
(Attempt 1 failed, attempt 2 passed. Failed test:
`test_save_load_pipeline_estimator` in `CrossValidatorIOPipelineTests`)
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing tests. The flaky test `test_save_load_pipeline_estimator` in
`CrossValidatorIOPipelineTests` exercises the concurrent scenario that triggers
this bug.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #54291 from Yicong-Huang/SPARK-55505/fix/execution-root-id-npe.
Authored-by: Yicong Huang <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala | 4 ++--
.../main/scala/org/apache/spark/sql/execution/SQLExecution.scala | 7 +++++--
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index 2d72b4dd6bf2..d18ce9e7fc52 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -26,8 +26,8 @@ import scala.reflect.ClassTag
import org.apache.spark.{ComplexFutureAction, FutureAction, JobSubmitter}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{RDD_LIMIT_INITIAL_NUM_PARTITIONS,
RDD_LIMIT_SCALE_UP_FACTOR}
+import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.ArrayImplicits._
-import org.apache.spark.util.ThreadUtils
/**
* A set of asynchronous RDD actions available through an implicit conversion.
@@ -68,7 +68,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends
Serializable with Loggi
*/
def takeAsync(num: Int): FutureAction[Seq[T]] = self.withScope {
val callSite = self.context.getCallSite()
- val localProperties = self.context.getLocalProperties
+ val localProperties =
Utils.cloneProperties(self.context.getLocalProperties)
// Cached thread pool to handle aggregation of subtasks.
implicit val executionContext = AsyncRDDActions.futureExecutionContext
val results = new ArrayBuffer[T]
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
index cf26b2991652..96a0053f97b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala
@@ -108,11 +108,14 @@ object SQLExecution extends Logging {
// Track the "root" SQL Execution Id for nested/sub queries. The current
execution is the
// root execution if the root execution ID is null.
// And for the root execution, rootExecutionId == executionId.
- if (sc.getLocalProperty(EXECUTION_ROOT_ID_KEY) == null) {
+ val existingRootId = sc.getLocalProperty(EXECUTION_ROOT_ID_KEY)
+ val rootExecutionId = if (existingRootId != null) {
+ existingRootId.toLong
+ } else {
sc.setLocalProperty(EXECUTION_ROOT_ID_KEY, executionId.toString)
sc.addJobTag(executionIdJobTag(sparkSession, executionId))
+ executionId
}
- val rootExecutionId = sc.getLocalProperty(EXECUTION_ROOT_ID_KEY).toLong
executionIdToQueryExecution.put(executionId, queryExecution)
val originalInterruptOnCancel =
sc.getLocalProperty(SPARK_JOB_INTERRUPT_ON_CANCEL)
if (originalInterruptOnCancel == null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]