This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fec2d5d8f0d6 [SPARK-54796][CORE] Fix NPE caused by race condition 
between Executor initialization and shuffle migration
fec2d5d8f0d6 is described below

commit fec2d5d8f0d67d6d39b9173b5ca8a9c500a1fea3
Author: Tengfei Huang <[email protected]>
AuthorDate: Tue Mar 10 11:44:14 2026 +0800

    [SPARK-54796][CORE] Fix NPE caused by race condition between Executor 
initialization and shuffle migration
    
    ### What changes were proposed in this pull request?
    Fixing the race condition between executor initialization and shuffle 
migration. When starting an `Executor`, spark will:
    - [Initialize 
blockManager](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L335),
 after this the block manager will be detected as peers of other node to 
receive shuffle blocks migration.
    - Then [initialize 
shuffleManager](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L531).
    
    Shuffle migration request could be received before shuffle manager is 
initialized, `putBlockDataAsStream` will be invoked and shuffleManager will be 
initialized as `null`.
    ```
    private lazy val shuffleManager = 
Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)
    ```
    Then all the later operations depending on `shuffleManager` will fail with 
`NPE`.
    
    To fix the issue, this PR propose to:
    1. Check whether shuffleManager is initialized when in 
`putBlockDataAsStream` which could be called before the Executor is fully ready 
to handle migration requests.
    2. If not ready, will wait until the specified timeout exceeds.
    3. The `BlockManagerDecommissioner` will retry if the shuffle migration 
request hits the issue timeout waiting for shuffleManager to be initialized.
    
    ### Why are the changes needed?
    Fixing the race condition leading to the `shuffleManager` in BlockManager 
to be initialized as `null`.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT added.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Generated-by: Cursor 2.4.23
    
    Closes #54136 from ivoson/SPARK-54796.
    
    Lead-authored-by: Tengfei Huang <[email protected]>
    Co-authored-by: Tengfei Huang <[email protected]>
    Signed-off-by: Yi Wu <[email protected]>
---
 .../src/main/scala/org/apache/spark/SparkEnv.scala | 31 ++++++++++++-
 .../org/apache/spark/internal/config/package.scala | 10 ++++
 .../org/apache/spark/storage/BlockManager.scala    | 35 +++++++++++++-
 .../spark/storage/BlockManagerDecommissioner.scala | 42 +++++++++++------
 .../ShuffleManagerNotInitializedException.scala    | 27 +++++++++++
 .../BlockManagerDecommissionUnitSuite.scala        | 53 ++++++++++++++++++++++
 .../apache/spark/storage/BlockManagerSuite.scala   | 37 +++++++++++++--
 7 files changed, 216 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 796dbf4b6d5f..5374a8f5a015 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -18,6 +18,7 @@
 package org.apache.spark
 
 import java.io.File
+import java.util.concurrent.{CountDownLatch, TimeUnit}
 
 import scala.collection.concurrent
 import scala.collection.mutable
@@ -75,8 +76,31 @@ class SparkEnv (
   // user jars to define custom ShuffleManagers.
   @volatile private var _shuffleManager: ShuffleManager = _
 
+  // Latch to signal when the ShuffleManager has been initialized.
+  // Used to allow callers to wait for initialization.
+  private val shuffleManagerInitLatch = new CountDownLatch(1)
+
   def shuffleManager: ShuffleManager = _shuffleManager
 
+  /**
+   * Wait for the ShuffleManager to be initialized within the specified 
timeout.
+   *
+   * @param timeoutMs Maximum time to wait in milliseconds
+   * @return true if the ShuffleManager was initialized within the timeout, 
false otherwise
+   */
+  private[spark] def waitForShuffleManagerInit(timeoutMs: Long): Boolean = {
+    shuffleManagerInitLatch.await(timeoutMs, TimeUnit.MILLISECONDS)
+  }
+
+  /**
+   * Check if the ShuffleManager has been initialized.
+   *
+   * @return true if the ShuffleManager is initialized, false otherwise
+   */
+  private[spark] def isShuffleManagerInitialized: Boolean = {
+    _shuffleManager != null
+  }
+
   // We initialize the MemoryManager later in SparkContext after DriverPlugin 
is loaded
   // to allow the plugin to overwrite executor memory configurations
   private var _memoryManager: MemoryManager = _
@@ -223,7 +247,12 @@ class SparkEnv (
   private[spark] def initializeShuffleManager(): Unit = {
     Preconditions.checkState(null == _shuffleManager,
       "Shuffle manager already initialized to %s", _shuffleManager)
-    _shuffleManager = ShuffleManager.create(conf, executorId == 
SparkContext.DRIVER_IDENTIFIER)
+    try {
+      _shuffleManager = ShuffleManager.create(conf, executorId == 
SparkContext.DRIVER_IDENTIFIER)
+    } finally {
+      // Signal that the ShuffleManager has been initialized
+      shuffleManagerInitLatch.countDown()
+    }
   }
 
   private[spark] def initializeMemoryManager(numUsableCores: Int): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 1fcf37b6e06b..9fee7a36a044 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -646,6 +646,16 @@ package object config {
       .bytesConf(ByteUnit.BYTE)
       .createOptional
 
+  private[spark] val STORAGE_SHUFFLE_MANAGER_INIT_WAITING_TIMEOUT =
+    ConfigBuilder("spark.storage.shuffleManager.initWaitingTimeout")
+      .doc("Maximum time to wait for the ShuffleManager to be initialized when 
receiving " +
+        "shuffle migration requests. If the ShuffleManager is not initialized 
within this " +
+        "timeout, the migration request will be rejected and the sender should 
retry.")
+      .version("4.2.0")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .checkValue(_ > 0, "Timeout should be positive.")
+      .createWithDefaultString("30s")
+
   private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
     ConfigBuilder("spark.storage.replication.topologyFile")
       .version("2.1.0")
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0d675a3abd12..11d92e998214 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -196,7 +196,14 @@ private[spark] class BlockManager(
   // We initialize the ShuffleManager later in SparkContext and Executor, to 
allow
   // user jars to define custom ShuffleManagers, as such `_shuffleManager` 
will be null here
   // (except for tests) and we ask for the instance from the SparkEnv.
-  private lazy val shuffleManager = 
Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)
+  private lazy val shuffleManager = {
+    Option(_shuffleManager).getOrElse {
+      // Wait for ShuffleManager to be initialized before handling shuffle 
operations.
+      // Exception will be thrown if it is not initialized within the 
configured timeout.
+      waitForShuffleManagerInit()
+      SparkEnv.get.shuffleManager
+    }
+  }
 
   // Similarly, we also initialize MemoryManager later after DriverPlugin is 
loaded, to
   // allow the plugin to overwrite certain memory configurations. The 
`_memoryManager` will be
@@ -314,6 +321,32 @@ private[spark] class BlockManager(
     shuffleManager.shuffleBlockResolver.asInstanceOf[MigratableResolver]
   }
 
+  // Timeout waiting for ShuffleManager initialization when receiving shuffle 
migration requests
+  private val shuffleManagerInitWaitingTimeoutMs =
+    conf.get(config.STORAGE_SHUFFLE_MANAGER_INIT_WAITING_TIMEOUT)
+
+  /**
+   * Wait for the ShuffleManager to be initialized before handling shuffle 
migration requests.
+   * This is necessary because BlockManager is registered with the driver 
before ShuffleManager
+   * is initialized in Executor, which could cause NPE if shuffle migration 
requests are received
+   * before ShuffleManager is ready.
+   *
+   * @throws ShuffleManagerNotInitializedException if ShuffleManager is not 
initialized within
+   *         the configured timeout
+   */
+  private def waitForShuffleManagerInit(): Unit = {
+    if (!SparkEnv.get.isShuffleManagerInitialized) {
+      logInfo(log"Waiting for ShuffleManager initialization before handling 
shuffle operations")
+
+      if 
(!SparkEnv.get.waitForShuffleManagerInit(shuffleManagerInitWaitingTimeoutMs)) {
+        logWarning(log"ShuffleManager not initialized within " +
+          log"${MDC(TIMEOUT, shuffleManagerInitWaitingTimeoutMs)}ms " +
+          log"while handling shuffle operations")
+        throw new 
ShuffleManagerNotInitializedException(shuffleManagerInitWaitingTimeoutMs)
+      }
+    }
+  }
+
   override def getLocalDiskDirs: Array[String] = 
diskBlockManager.localDirsString
 
   /**
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
index 1a067fab04c1..5d4d745d191d 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
@@ -45,6 +45,8 @@ private[storage] class BlockManagerDecommissioner(
     conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
   private val blockSavedOnDecommissionedBlockManagerException =
     classOf[BlockSavedOnDecommissionedBlockManagerException].getSimpleName
+  private val shuffleManagerNotInitializedException =
+    classOf[ShuffleManagerNotInitializedException].getSimpleName
 
   // Used for tracking if our migrations are complete. Readable for testing
   @volatile private[storage] var lastRDDMigrationTime: Long = 0
@@ -105,7 +107,11 @@ private[storage] class BlockManagerDecommissioner(
         try {
           val (shuffleBlockInfo, retryCount) = nextShuffleBlockToMigrate()
           val blocks = 
bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
-          var isTargetDecommissioned = false
+          var needRetry = false
+          // By default, increment the failure count on retry. Transient 
failures
+          // (e.g. target decommissioned, ShuffleManager not ready) reset this 
to
+          // retryCount to avoid penalizing the block.
+          var newRetryCount = retryCount + 1
           // We only migrate a shuffle block when both index file and data 
file exist.
           if (blocks.isEmpty) {
             logInfo(log"Ignore deleted shuffle block ${MDC(SHUFFLE_BLOCK_INFO, 
shuffleBlockInfo)}")
@@ -155,33 +161,43 @@ private[storage] class BlockManagerDecommissioner(
                 } else if (e.getCause != null && e.getCause.getMessage != null
                   && e.getCause.getMessage
                   .contains(blockSavedOnDecommissionedBlockManagerException)) {
-                  isTargetDecommissioned = true
+                  // Target is decommissioned, don't penalize the block.
                   keepRunning = false
+                  needRetry = true
+                  newRetryCount = retryCount
+                } else if (e.getCause != null && e.getCause.getMessage != null
+                  && e.getCause.getMessage
+                  .contains(shuffleManagerNotInitializedException)) {
+                  // Target executor's ShuffleManager is not yet initialized.
+                  // Still increment retry count to guarantee termination if 
the
+                  // target permanently fails to initialize.
+                  logWarning(log"Target executor's ShuffleManager not 
initialized for " +
+                    log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}. Will 
retry.")
+                  needRetry = true
                 } else {
                   logError(log"Error occurred during migrating " +
                     log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}", e)
                   keepRunning = false
+                  needRetry = true
                 }
               case e: Exception =>
                 logError(log"Error occurred during migrating " +
                   log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}", e)
                 keepRunning = false
+                needRetry = true
             }
           }
-          if (keepRunning) {
-            numMigratedShuffles.incrementAndGet()
-          } else {
-            logWarning(log"Stop migrating shuffle blocks to ${MDC(PEER, 
peer)}")
-
-            val newRetryCount = if (isTargetDecommissioned) {
-              retryCount
-            } else {
-              retryCount + 1
-            }
-            // Do not mark the block as migrated if it still needs retry
+          // needRetry: whether the block should be requeued for retry.
+          // keepRunning: whether the migration thread should continue for 
this peer.
+          if (needRetry) {
             if (!allowRetry(shuffleBlockInfo, newRetryCount)) {
               numMigratedShuffles.incrementAndGet()
             }
+          } else {
+            numMigratedShuffles.incrementAndGet()
+          }
+          if (!keepRunning) {
+            logWarning(log"Stop migrating shuffle blocks to ${MDC(PEER, 
peer)}")
           }
         } catch {
           case _: InterruptedException =>
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleManagerNotInitializedException.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleManagerNotInitializedException.scala
new file mode 100644
index 000000000000..936710b0e4d2
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleManagerNotInitializedException.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+/**
+ * Exception thrown when a shuffle migration request is received but the 
ShuffleManager
+ * has not been initialized yet on the target executor. The sender should 
retry the
+ * migration request after a short delay.
+ */
+class ShuffleManagerNotInitializedException(
+    timeoutMs: Long) extends Exception(
+    s"ShuffleManager not initialized within ${timeoutMs}ms.")
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
index b7ad6722faa8..ab6c19575d23 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
@@ -18,12 +18,15 @@
 package org.apache.spark.storage
 
 import java.io.FileNotFoundException
+import java.util.concurrent.atomic.AtomicLong
 
 import scala.concurrent.Future
 import scala.concurrent.duration._
 
 import org.mockito.{ArgumentMatchers => mc}
 import org.mockito.Mockito.{atLeast => least, mock, never, times, verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.matchers.must.Matchers
 
@@ -271,6 +274,56 @@ class BlockManagerDecommissionUnitSuite extends 
SparkFunSuite with Matchers {
         mc.any(), mc.any(), mc.any(), mc.isNull())
   }
 
+  test("SPARK-54796: block decom manager handles 
ShuffleManagerNotInitializedException " +
+    "with retry on same peer") {
+    // Set up the mocks so we return one shuffle block
+    val conf = sparkConf
+      .clone
+      .set(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK, 3)
+    val bm = mock(classOf[BlockManager])
+    val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+    registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1)))
+    when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+    when(bm.getMigratableRDDBlocks())
+      .thenReturn(Seq())
+    val exe1 = BlockManagerId("exec1", "host1", 12345)
+    when(bm.getPeers(mc.any()))
+      .thenReturn(Seq(exe1))
+
+    val blockTransferService = mock(classOf[BlockTransferService])
+    val uploadAttempts = new AtomicLong(0)
+    // Simulate ShuffleManagerNotInitializedException on first attempt,
+    // then succeed on retry to the same peer (transient condition resolved)
+    when(blockTransferService.uploadBlock(
+      mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.isNull()))
+      .thenAnswer(new Answer[Future[Unit]] {
+        override def answer(invocation: InvocationOnMock): Future[Unit] = {
+          val attempt = uploadAttempts.incrementAndGet()
+          if (attempt == 1) {
+            // First attempt fails with ShuffleManagerNotInitializedException
+            Future.failed(new 
RuntimeException("ShuffleManagerNotInitializedException"))
+          } else {
+            // Subsequent attempts succeed (ShuffleManager is now initialized)
+            Future.successful(())
+          }
+        }
+      })
+    when(blockTransferService.uploadBlockSync(
+      mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.isNull()))
+      .thenCallRealMethod()
+
+    when(bm.blockTransferService).thenReturn(blockTransferService)
+
+    // The migration thread keeps running for the same peer and retries 
succeed.
+    val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+    validateDecommissionTimestampsOnManager(bmDecomManager, numShuffles = 
Some(1))
+    // 1 failed attempt + 2 successful sub-block uploads (index + data) on 
retry,
+    // all to the same peer since the thread keeps running
+    verify(blockTransferService, times(3))
+      .uploadBlock(mc.any(), mc.any(), mc.eq(exe1.executorId),
+        mc.any(), mc.any(), mc.any(), mc.isNull())
+  }
+
   test("block decom manager handles IO failures") {
     // Set up the mocks so we return one shuffle block
     val bm = mock(classOf[BlockManager])
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index f4bb5b7cf7cb..8dcc1b798181 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -185,6 +185,11 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
     conf.set(DRIVER_PORT, rpcEnv.address.port)
     conf.set(DRIVER_HOST_ADDRESS, rpcEnv.address.host)
 
+    val env = mock(classOf[SparkEnv])
+    when(env.conf).thenReturn(conf)
+    when(env.isShuffleManagerInitialized).thenReturn(true)
+    SparkEnv.set(env)
+
     // Mock SparkContext to reduce the memory usage of tests. It's fine since 
the only reason we
     // need to create a SparkContext is to initialize LiveListenerBus.
     sc = mock(classOf[SparkContext])
@@ -215,6 +220,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers 
with PrivateMethodTe
       rpcEnv = null
       master = null
       liveListenerBus = null
+      SparkEnv.set(null)
     } finally {
       super.afterEach()
     }
@@ -2030,6 +2036,33 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
     assert(exception.getMessage.contains("unsupported shuffle resolver"))
   }
 
+  test("SPARK-54796: putBlockDataAsStream throws 
ShuffleManagerNotInitializedException " +
+    "on timeout") {
+    val bm = makeBlockManager(1000, "exec2", testConf = Some(conf),
+      shuffleManager = null)
+    val sortShuffleMgr = makeSortShuffleManager(Some(conf))
+    sortShuffleMgr.shuffleBlockResolver._blockManager = bm
+
+    // Create a shuffle block ID
+    val shuffleBlockId = ShuffleDataBlockId(0, 0, 0)
+
+    // Mock SparkEnv to simulate uninitialized ShuffleManager
+    val mockEnv = SparkEnv.get
+    when(mockEnv.isShuffleManagerInitialized).thenReturn(false)
+    when(mockEnv.waitForShuffleManagerInit(mc.anyLong())).thenReturn(false)
+
+    val exception = intercept[ShuffleManagerNotInitializedException] {
+      bm.putBlockDataAsStream(shuffleBlockId, StorageLevel.DISK_ONLY, 
ClassTag(classOf[String]))
+    }
+    assert(exception.getMessage.contains("ShuffleManager not initialized"))
+
+    // Retry initialization should succeed once ShuffleManager is initialized
+    when(mockEnv.isShuffleManagerInitialized).thenReturn(true)
+    when(mockEnv.waitForShuffleManagerInit(mc.anyLong())).thenReturn(true)
+    when(mockEnv.shuffleManager).thenReturn(sortShuffleMgr)
+    assert(bm.migratableResolver != null)
+  }
+
   test("test decommission block manager should not be part of peers") {
     val exec1 = "exec1"
     val exec2 = "exec2"
@@ -2128,10 +2161,6 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with PrivateMethodTe
       assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location === 
bm1.blockManagerId)
       assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(1).location === 
bm1.blockManagerId)
 
-      val env = mock(classOf[SparkEnv])
-      when(env.conf).thenReturn(conf)
-      SparkEnv.set(env)
-
       decomManager.refreshMigratableShuffleBlocks()
 
       if (willReject) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to