This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fec2d5d8f0d6 [SPARK-54796][CORE] Fix NPE caused by race condition
between Executor initialization and shuffle migration
fec2d5d8f0d6 is described below
commit fec2d5d8f0d67d6d39b9173b5ca8a9c500a1fea3
Author: Tengfei Huang <[email protected]>
AuthorDate: Tue Mar 10 11:44:14 2026 +0800
[SPARK-54796][CORE] Fix NPE caused by race condition between Executor
initialization and shuffle migration
### What changes were proposed in this pull request?
Fixing the race condition between executor initialization and shuffle
migration. When starting an `Executor`, spark will:
- [Initialize
blockManager](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L335),
after this the block manager will be detected as peers of other node to
receive shuffle blocks migration.
- Then [initialize
shuffleManager](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L531).
Shuffle migration request could be received before shuffle manager is
initialized, `putBlockDataAsStream` will be invoked and shuffleManager will be
initialized as `null`.
```
private lazy val shuffleManager =
Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)
```
Then all the later operations depending on `shuffleManager` will fail with
`NPE`.
To fix the issue, this PR propose to:
1. Check whether shuffleManager is initialized when in
`putBlockDataAsStream` which could be called before the Executor is fully ready
to handle migration requests.
2. If not ready, will wait until the specified timeout exceeds.
3. The `BlockManagerDecommissioner` will retry if the shuffle migration
request hits the issue timeout waiting for shuffleManager to be initialized.
### Why are the changes needed?
Fixing the race condition leading to the `shuffleManager` in BlockManager
to be initialized as `null`.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT added.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor 2.4.23
Closes #54136 from ivoson/SPARK-54796.
Lead-authored-by: Tengfei Huang <[email protected]>
Co-authored-by: Tengfei Huang <[email protected]>
Signed-off-by: Yi Wu <[email protected]>
---
.../src/main/scala/org/apache/spark/SparkEnv.scala | 31 ++++++++++++-
.../org/apache/spark/internal/config/package.scala | 10 ++++
.../org/apache/spark/storage/BlockManager.scala | 35 +++++++++++++-
.../spark/storage/BlockManagerDecommissioner.scala | 42 +++++++++++------
.../ShuffleManagerNotInitializedException.scala | 27 +++++++++++
.../BlockManagerDecommissionUnitSuite.scala | 53 ++++++++++++++++++++++
.../apache/spark/storage/BlockManagerSuite.scala | 37 +++++++++++++--
7 files changed, 216 insertions(+), 19 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 796dbf4b6d5f..5374a8f5a015 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -18,6 +18,7 @@
package org.apache.spark
import java.io.File
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection.concurrent
import scala.collection.mutable
@@ -75,8 +76,31 @@ class SparkEnv (
// user jars to define custom ShuffleManagers.
@volatile private var _shuffleManager: ShuffleManager = _
+ // Latch to signal when the ShuffleManager has been initialized.
+ // Used to allow callers to wait for initialization.
+ private val shuffleManagerInitLatch = new CountDownLatch(1)
+
def shuffleManager: ShuffleManager = _shuffleManager
+ /**
+ * Wait for the ShuffleManager to be initialized within the specified
timeout.
+ *
+ * @param timeoutMs Maximum time to wait in milliseconds
+ * @return true if the ShuffleManager was initialized within the timeout,
false otherwise
+ */
+ private[spark] def waitForShuffleManagerInit(timeoutMs: Long): Boolean = {
+ shuffleManagerInitLatch.await(timeoutMs, TimeUnit.MILLISECONDS)
+ }
+
+ /**
+ * Check if the ShuffleManager has been initialized.
+ *
+ * @return true if the ShuffleManager is initialized, false otherwise
+ */
+ private[spark] def isShuffleManagerInitialized: Boolean = {
+ _shuffleManager != null
+ }
+
// We initialize the MemoryManager later in SparkContext after DriverPlugin
is loaded
// to allow the plugin to overwrite executor memory configurations
private var _memoryManager: MemoryManager = _
@@ -223,7 +247,12 @@ class SparkEnv (
private[spark] def initializeShuffleManager(): Unit = {
Preconditions.checkState(null == _shuffleManager,
"Shuffle manager already initialized to %s", _shuffleManager)
- _shuffleManager = ShuffleManager.create(conf, executorId ==
SparkContext.DRIVER_IDENTIFIER)
+ try {
+ _shuffleManager = ShuffleManager.create(conf, executorId ==
SparkContext.DRIVER_IDENTIFIER)
+ } finally {
+ // Signal that the ShuffleManager has been initialized
+ shuffleManagerInitLatch.countDown()
+ }
}
private[spark] def initializeMemoryManager(numUsableCores: Int): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 1fcf37b6e06b..9fee7a36a044 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -646,6 +646,16 @@ package object config {
.bytesConf(ByteUnit.BYTE)
.createOptional
+ private[spark] val STORAGE_SHUFFLE_MANAGER_INIT_WAITING_TIMEOUT =
+ ConfigBuilder("spark.storage.shuffleManager.initWaitingTimeout")
+ .doc("Maximum time to wait for the ShuffleManager to be initialized when
receiving " +
+ "shuffle migration requests. If the ShuffleManager is not initialized
within this " +
+ "timeout, the migration request will be rejected and the sender should
retry.")
+ .version("4.2.0")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .checkValue(_ > 0, "Timeout should be positive.")
+ .createWithDefaultString("30s")
+
private[spark] val STORAGE_REPLICATION_TOPOLOGY_FILE =
ConfigBuilder("spark.storage.replication.topologyFile")
.version("2.1.0")
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0d675a3abd12..11d92e998214 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -196,7 +196,14 @@ private[spark] class BlockManager(
// We initialize the ShuffleManager later in SparkContext and Executor, to
allow
// user jars to define custom ShuffleManagers, as such `_shuffleManager`
will be null here
// (except for tests) and we ask for the instance from the SparkEnv.
- private lazy val shuffleManager =
Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)
+ private lazy val shuffleManager = {
+ Option(_shuffleManager).getOrElse {
+ // Wait for ShuffleManager to be initialized before handling shuffle
operations.
+ // Exception will be thrown if it is not initialized within the
configured timeout.
+ waitForShuffleManagerInit()
+ SparkEnv.get.shuffleManager
+ }
+ }
// Similarly, we also initialize MemoryManager later after DriverPlugin is
loaded, to
// allow the plugin to overwrite certain memory configurations. The
`_memoryManager` will be
@@ -314,6 +321,32 @@ private[spark] class BlockManager(
shuffleManager.shuffleBlockResolver.asInstanceOf[MigratableResolver]
}
+ // Timeout waiting for ShuffleManager initialization when receiving shuffle
migration requests
+ private val shuffleManagerInitWaitingTimeoutMs =
+ conf.get(config.STORAGE_SHUFFLE_MANAGER_INIT_WAITING_TIMEOUT)
+
+ /**
+ * Wait for the ShuffleManager to be initialized before handling shuffle
migration requests.
+ * This is necessary because BlockManager is registered with the driver
before ShuffleManager
+ * is initialized in Executor, which could cause NPE if shuffle migration
requests are received
+ * before ShuffleManager is ready.
+ *
+ * @throws ShuffleManagerNotInitializedException if ShuffleManager is not
initialized within
+ * the configured timeout
+ */
+ private def waitForShuffleManagerInit(): Unit = {
+ if (!SparkEnv.get.isShuffleManagerInitialized) {
+ logInfo(log"Waiting for ShuffleManager initialization before handling
shuffle operations")
+
+ if
(!SparkEnv.get.waitForShuffleManagerInit(shuffleManagerInitWaitingTimeoutMs)) {
+ logWarning(log"ShuffleManager not initialized within " +
+ log"${MDC(TIMEOUT, shuffleManagerInitWaitingTimeoutMs)}ms " +
+ log"while handling shuffle operations")
+ throw new
ShuffleManagerNotInitializedException(shuffleManagerInitWaitingTimeoutMs)
+ }
+ }
+ }
+
override def getLocalDiskDirs: Array[String] =
diskBlockManager.localDirsString
/**
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
index 1a067fab04c1..5d4d745d191d 100644
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
@@ -45,6 +45,8 @@ private[storage] class BlockManagerDecommissioner(
conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
private val blockSavedOnDecommissionedBlockManagerException =
classOf[BlockSavedOnDecommissionedBlockManagerException].getSimpleName
+ private val shuffleManagerNotInitializedException =
+ classOf[ShuffleManagerNotInitializedException].getSimpleName
// Used for tracking if our migrations are complete. Readable for testing
@volatile private[storage] var lastRDDMigrationTime: Long = 0
@@ -105,7 +107,11 @@ private[storage] class BlockManagerDecommissioner(
try {
val (shuffleBlockInfo, retryCount) = nextShuffleBlockToMigrate()
val blocks =
bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
- var isTargetDecommissioned = false
+ var needRetry = false
+ // By default, increment the failure count on retry. Transient
failures
+ // (e.g. target decommissioned, ShuffleManager not ready) reset this
to
+ // retryCount to avoid penalizing the block.
+ var newRetryCount = retryCount + 1
// We only migrate a shuffle block when both index file and data
file exist.
if (blocks.isEmpty) {
logInfo(log"Ignore deleted shuffle block ${MDC(SHUFFLE_BLOCK_INFO,
shuffleBlockInfo)}")
@@ -155,33 +161,43 @@ private[storage] class BlockManagerDecommissioner(
} else if (e.getCause != null && e.getCause.getMessage != null
&& e.getCause.getMessage
.contains(blockSavedOnDecommissionedBlockManagerException)) {
- isTargetDecommissioned = true
+ // Target is decommissioned, don't penalize the block.
keepRunning = false
+ needRetry = true
+ newRetryCount = retryCount
+ } else if (e.getCause != null && e.getCause.getMessage != null
+ && e.getCause.getMessage
+ .contains(shuffleManagerNotInitializedException)) {
+ // Target executor's ShuffleManager is not yet initialized.
+ // Still increment retry count to guarantee termination if
the
+ // target permanently fails to initialize.
+ logWarning(log"Target executor's ShuffleManager not
initialized for " +
+ log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}. Will
retry.")
+ needRetry = true
} else {
logError(log"Error occurred during migrating " +
log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}", e)
keepRunning = false
+ needRetry = true
}
case e: Exception =>
logError(log"Error occurred during migrating " +
log"${MDC(SHUFFLE_BLOCK_INFO, shuffleBlockInfo)}", e)
keepRunning = false
+ needRetry = true
}
}
- if (keepRunning) {
- numMigratedShuffles.incrementAndGet()
- } else {
- logWarning(log"Stop migrating shuffle blocks to ${MDC(PEER,
peer)}")
-
- val newRetryCount = if (isTargetDecommissioned) {
- retryCount
- } else {
- retryCount + 1
- }
- // Do not mark the block as migrated if it still needs retry
+ // needRetry: whether the block should be requeued for retry.
+ // keepRunning: whether the migration thread should continue for
this peer.
+ if (needRetry) {
if (!allowRetry(shuffleBlockInfo, newRetryCount)) {
numMigratedShuffles.incrementAndGet()
}
+ } else {
+ numMigratedShuffles.incrementAndGet()
+ }
+ if (!keepRunning) {
+ logWarning(log"Stop migrating shuffle blocks to ${MDC(PEER,
peer)}")
}
} catch {
case _: InterruptedException =>
diff --git
a/core/src/main/scala/org/apache/spark/storage/ShuffleManagerNotInitializedException.scala
b/core/src/main/scala/org/apache/spark/storage/ShuffleManagerNotInitializedException.scala
new file mode 100644
index 000000000000..936710b0e4d2
--- /dev/null
+++
b/core/src/main/scala/org/apache/spark/storage/ShuffleManagerNotInitializedException.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+/**
+ * Exception thrown when a shuffle migration request is received but the
ShuffleManager
+ * has not been initialized yet on the target executor. The sender should
retry the
+ * migration request after a short delay.
+ */
+class ShuffleManagerNotInitializedException(
+ timeoutMs: Long) extends Exception(
+ s"ShuffleManager not initialized within ${timeoutMs}ms.")
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
index b7ad6722faa8..ab6c19575d23 100644
---
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
@@ -18,12 +18,15 @@
package org.apache.spark.storage
import java.io.FileNotFoundException
+import java.util.concurrent.atomic.AtomicLong
import scala.concurrent.Future
import scala.concurrent.duration._
import org.mockito.{ArgumentMatchers => mc}
import org.mockito.Mockito.{atLeast => least, mock, never, times, verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
import org.scalatest.concurrent.Eventually._
import org.scalatest.matchers.must.Matchers
@@ -271,6 +274,56 @@ class BlockManagerDecommissionUnitSuite extends
SparkFunSuite with Matchers {
mc.any(), mc.any(), mc.any(), mc.isNull())
}
+ test("SPARK-54796: block decom manager handles
ShuffleManagerNotInitializedException " +
+ "with retry on same peer") {
+ // Set up the mocks so we return one shuffle block
+ val conf = sparkConf
+ .clone
+ .set(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK, 3)
+ val bm = mock(classOf[BlockManager])
+ val migratableShuffleBlockResolver = mock(classOf[MigratableResolver])
+ registerShuffleBlocks(migratableShuffleBlockResolver, Set((1, 1L, 1)))
+ when(bm.migratableResolver).thenReturn(migratableShuffleBlockResolver)
+ when(bm.getMigratableRDDBlocks())
+ .thenReturn(Seq())
+ val exe1 = BlockManagerId("exec1", "host1", 12345)
+ when(bm.getPeers(mc.any()))
+ .thenReturn(Seq(exe1))
+
+ val blockTransferService = mock(classOf[BlockTransferService])
+ val uploadAttempts = new AtomicLong(0)
+ // Simulate ShuffleManagerNotInitializedException on first attempt,
+ // then succeed on retry to the same peer (transient condition resolved)
+ when(blockTransferService.uploadBlock(
+ mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.isNull()))
+ .thenAnswer(new Answer[Future[Unit]] {
+ override def answer(invocation: InvocationOnMock): Future[Unit] = {
+ val attempt = uploadAttempts.incrementAndGet()
+ if (attempt == 1) {
+ // First attempt fails with ShuffleManagerNotInitializedException
+ Future.failed(new
RuntimeException("ShuffleManagerNotInitializedException"))
+ } else {
+ // Subsequent attempts succeed (ShuffleManager is now initialized)
+ Future.successful(())
+ }
+ }
+ })
+ when(blockTransferService.uploadBlockSync(
+ mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.any(), mc.isNull()))
+ .thenCallRealMethod()
+
+ when(bm.blockTransferService).thenReturn(blockTransferService)
+
+ // The migration thread keeps running for the same peer and retries
succeed.
+ val bmDecomManager = new BlockManagerDecommissioner(conf, bm)
+ validateDecommissionTimestampsOnManager(bmDecomManager, numShuffles =
Some(1))
+ // 1 failed attempt + 2 successful sub-block uploads (index + data) on
retry,
+ // all to the same peer since the thread keeps running
+ verify(blockTransferService, times(3))
+ .uploadBlock(mc.any(), mc.any(), mc.eq(exe1.executorId),
+ mc.any(), mc.any(), mc.any(), mc.isNull())
+ }
+
test("block decom manager handles IO failures") {
// Set up the mocks so we return one shuffle block
val bm = mock(classOf[BlockManager])
diff --git
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index f4bb5b7cf7cb..8dcc1b798181 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -185,6 +185,11 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with PrivateMethodTe
conf.set(DRIVER_PORT, rpcEnv.address.port)
conf.set(DRIVER_HOST_ADDRESS, rpcEnv.address.host)
+ val env = mock(classOf[SparkEnv])
+ when(env.conf).thenReturn(conf)
+ when(env.isShuffleManagerInitialized).thenReturn(true)
+ SparkEnv.set(env)
+
// Mock SparkContext to reduce the memory usage of tests. It's fine since
the only reason we
// need to create a SparkContext is to initialize LiveListenerBus.
sc = mock(classOf[SparkContext])
@@ -215,6 +220,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers
with PrivateMethodTe
rpcEnv = null
master = null
liveListenerBus = null
+ SparkEnv.set(null)
} finally {
super.afterEach()
}
@@ -2030,6 +2036,33 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with PrivateMethodTe
assert(exception.getMessage.contains("unsupported shuffle resolver"))
}
+ test("SPARK-54796: putBlockDataAsStream throws
ShuffleManagerNotInitializedException " +
+ "on timeout") {
+ val bm = makeBlockManager(1000, "exec2", testConf = Some(conf),
+ shuffleManager = null)
+ val sortShuffleMgr = makeSortShuffleManager(Some(conf))
+ sortShuffleMgr.shuffleBlockResolver._blockManager = bm
+
+ // Create a shuffle block ID
+ val shuffleBlockId = ShuffleDataBlockId(0, 0, 0)
+
+ // Mock SparkEnv to simulate uninitialized ShuffleManager
+ val mockEnv = SparkEnv.get
+ when(mockEnv.isShuffleManagerInitialized).thenReturn(false)
+ when(mockEnv.waitForShuffleManagerInit(mc.anyLong())).thenReturn(false)
+
+ val exception = intercept[ShuffleManagerNotInitializedException] {
+ bm.putBlockDataAsStream(shuffleBlockId, StorageLevel.DISK_ONLY,
ClassTag(classOf[String]))
+ }
+ assert(exception.getMessage.contains("ShuffleManager not initialized"))
+
+ // Retry initialization should succeed once ShuffleManager is initialized
+ when(mockEnv.isShuffleManagerInitialized).thenReturn(true)
+ when(mockEnv.waitForShuffleManagerInit(mc.anyLong())).thenReturn(true)
+ when(mockEnv.shuffleManager).thenReturn(sortShuffleMgr)
+ assert(bm.migratableResolver != null)
+ }
+
test("test decommission block manager should not be part of peers") {
val exec1 = "exec1"
val exec2 = "exec2"
@@ -2128,10 +2161,6 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with PrivateMethodTe
assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(0).location ===
bm1.blockManagerId)
assert(mapOutputTracker.shuffleStatuses(0).mapStatuses(1).location ===
bm1.blockManagerId)
- val env = mock(classOf[SparkEnv])
- when(env.conf).thenReturn(conf)
- SparkEnv.set(env)
-
decomManager.refreshMigratableShuffleBlocks()
if (willReject) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]