This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f80041fdfdd [SPARK-38987][SHUFFLE] Throw FetchFailedException when 
merged shuffle blocks are corrupted and spark.shuffle.detectCorrupt is set to 
true
f80041fdfdd is described below

commit f80041fdfddae66bead7a3950028ee04d1b60bd2
Author: Aravind Patnam <[email protected]>
AuthorDate: Mon Jun 6 17:07:36 2022 -0500

    [SPARK-38987][SHUFFLE] Throw FetchFailedException when merged shuffle 
blocks are corrupted and spark.shuffle.detectCorrupt is set to true
    
    ### What changes were proposed in this pull request?
    Adds the corruption exception handling for merged shuffle chunk when 
spark.shuffle.detectCorrupt is set to true(default value is true)
    
    ### Why are the changes needed?
    Prior to Spark 3.0, spark.shuffle.detectCorrupt is set to true by default, 
and this configuration is one of the knob for early corruption detection. So 
the fallback can be triggered as expected.
    
    After Spark 3.0, even though spark.shuffle.detectCorrupt is still set to 
true by default, but the early corruption detect knob is controlled with a new 
configuration spark.shuffle.detectCorrupt.useExtraMemory, and it set to false 
by default. Thus the default behavior, with only Magnet enabled after Spark 
3.2.0(internal li-3.1.1), will disable the early corruption detection, thus no 
fallback will be triggered. And it will drop to throw an exception when start 
to read the corrupted blocks.
    
    We handle the corrupted stream for merged blocks by throwing a 
FetchFailedException in this case. This will trigger a retry based on the 
values of spark.shuffle.detectCorrupt.useExtraMemory and 
spark.shuffle.detectCorrupt.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    - Tested on internal cluster
    - Added UT
    
    This is a PR to tackle some of the build weirdness found in PR 36601 
(https://github.com/apache/spark/pull/36601).
    It contains the exact same diff. Closed that one out and recreated it here.
    
    Closes #36734 from akpatnam25/SPARK-38987.
    
    Authored-by: Aravind Patnam <[email protected]>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  22 +++-
 .../storage/ShuffleBlockFetcherIterator.scala      |   3 +
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 115 ++++++++++++++++++++-
 .../storage/ShuffleBlockFetcherIteratorSuite.scala |  28 +++++
 4 files changed, 163 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7d26d9e8d61..289296f6fdb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1885,6 +1885,16 @@ private[spark] class DAGScheduler(
               mapOutputTracker.
                 unregisterMergeResult(shuffleId, reduceId, bmAddress, 
Option(mapIndex))
             }
+          } else {
+            // Unregister the merge result of <shuffleId, reduceId> if there 
is a FetchFailed event
+            // and is not a  MetaDataFetchException which is signified by 
bmAddress being null
+            if (bmAddress != null &&
+              
bmAddress.executorId.equals(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)) {
+              assert(pushBasedShuffleEnabled, "Push based shuffle expected to 
" +
+                "be enabled when handling merge block fetch failure.")
+              mapOutputTracker.
+                unregisterMergeResult(shuffleId, reduceId, bmAddress, None)
+            }
           }
 
           if (failedStage.rdd.isBarrier()) {
@@ -2449,7 +2459,15 @@ private[spark] class DAGScheduler(
     val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
     logDebug(s"Considering removal of executor $execId; " +
       s"fileLost: $fileLost, currentEpoch: $currentEpoch")
-    if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) 
< currentEpoch) {
+    // Check if the execId is a shuffle push merger. We do not remove the 
executor if it is,
+    // and only remove the outputs on the host.
+    val isShuffleMerger = 
execId.equals(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+    if (isShuffleMerger && pushBasedShuffleEnabled) {
+      hostToUnregisterOutputs.foreach(
+        host => blockManagerMaster.removeShufflePushMergerLocation(host))
+    }
+    if (!isShuffleMerger &&
+      (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) 
< currentEpoch)) {
       executorFailureEpoch(execId) = currentEpoch
       logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
       if (pushBasedShuffleEnabled) {
@@ -2461,6 +2479,8 @@ private[spark] class DAGScheduler(
       clearCacheLocs()
     }
     if (fileLost) {
+      // When the fetch failure is for a merged shuffle chunk, 
ignoreShuffleFileLostEpoch is true
+      // and so all the files will be removed.
       val remove = if (ignoreShuffleFileLostEpoch) {
         true
       } else if (!shuffleFileLostEpoch.contains(execId) ||
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index c91aaa8ddb7..b5dd2f640b8 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -1166,6 +1166,9 @@ final class ShuffleBlockFetcherIterator(
       case ShuffleBlockBatchId(shuffleId, mapId, startReduceId, _) =>
         throw SparkCoreErrors.fetchFailedError(address, shuffleId, mapId, 
mapIndex, startReduceId,
           msg, e)
+      case ShuffleBlockChunkId(shuffleId, _, reduceId, _) =>
+        throw SparkCoreErrors.fetchFailedError(address, shuffleId,
+          SHUFFLE_PUSH_MAP_ID.toLong, SHUFFLE_PUSH_MAP_ID, reduceId, msg, e)
       case _ => throw SparkCoreErrors.failToGetNonShuffleBlockError(blockId, e)
     }
   }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 47fb8d70e5d..e51ced860db 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -293,6 +293,10 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     override def removeExecutor(execId: String): Unit = {
       // don't need to propagate to the driver, which we don't have
     }
+
+    override def removeShufflePushMergerLocation(host: String): Unit = {
+      // don't need to propagate to the driver, which we don't have
+    }
   }
 
   /** The list of results that DAGScheduler has collected. */
@@ -4342,6 +4346,101 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     assertDataStructuresEmpty()
   }
 
+  test("SPARK-38987: corrupted merged shuffle block FetchFailure should 
unregister merge results") {
+    initPushBasedShuffleConfs(conf)
+    DAGSchedulerSuite.clearMergerLocs()
+    DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", 
"host5"))
+
+    scheduler = new MyDAGScheduler(
+      sc,
+      taskScheduler,
+      sc.listenerBus,
+      mapOutputTracker,
+      blockManagerMaster,
+      sc.env,
+      shuffleMergeFinalize = false,
+      shuffleMergeRegister = false)
+    dagEventProcessLoopTester = new 
DAGSchedulerEventProcessLoopTester(scheduler)
+
+    val parts = 2
+    val shuffleMapRdd = new MyRDD(sc, parts, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(parts))
+    val reduceRdd = new MyRDD(sc, parts, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    // Submit a reduce job that depends which will create a map stage
+    submit(reduceRdd, (0 until parts).toArray)
+
+    val shuffleMapStage = 
scheduler.stageIdToStage(0).asInstanceOf[ShuffleMapStage]
+    scheduler.handleRegisterMergeStatuses(shuffleMapStage,
+      Seq((0, makeMergeStatus("hostA", shuffleDep.shuffleMergeId, 
isShufflePushMerger = true))))
+    scheduler.handleShuffleMergeFinalized(shuffleMapStage,
+      shuffleMapStage.shuffleDep.shuffleMergeId)
+    scheduler.handleRegisterMergeStatuses(shuffleMapStage,
+      Seq((1, makeMergeStatus("hostA", shuffleDep.shuffleMergeId, 
isShufflePushMerger = true))))
+
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) 
== 1)
+
+    // Complete shuffle map stage with FetchFailed on hostA
+    complete(taskSets(0), taskSets(0).tasks.zipWithIndex.map {
+      case (task, _) =>
+        (FetchFailed(
+          makeBlockManagerId("hostA", execId = 
Some(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)),
+          shuffleDep.shuffleId, -1L, -1, 0, "corruption fetch failure"), null)
+    }.toSeq)
+    assert(mapOutputTracker.getNumAvailableMergeResults(shuffleDep.shuffleId) 
== 0)
+  }
+
+  test("SPARK-38987: All shuffle outputs for a shuffle push" +
+    " merger executor should be cleaned up on a fetch failure when" +
+    "spark.files.fetchFailure.unRegisterOutputOnHost is true") {
+    initPushBasedShuffleConfs(conf)
+    conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
+
+    val shuffleMapRdd = new MyRDD(sc, 3, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new 
HashPartitioner(3))
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = 
mapOutputTracker)
+
+    submit(reduceRdd, Array(0, 1, 2))
+    // Map stage completes successfully,
+    // two tasks are run on an executor on hostA and one on an executor on 
hostB
+    completeShuffleMapStageSuccessfully(0, 0, 3, Seq("hostA", "hostA", 
"hostB"))
+    // Now the executor on hostA is lost
+    runEvent(ExecutorLost(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER,
+      ExecutorExited(-100, false, "Container marked as failed")))
+
+    // Shuffle push merger executor should not be removed and the shuffle 
files are not unregistered
+    verify(blockManagerMaster, 
times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+    verify(mapOutputTracker,
+      
times(0)).removeOutputsOnExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+
+    // Now a fetch failure from the lost executor occurs
+    complete(taskSets(1), Seq(
+      (FetchFailed(BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, 
"hostA", 12345),
+        shuffleId, 0L, 0, 0, "ignored"), null)
+    ))
+
+    // Verify that we are not removing the executor,
+    // and that we are only removing the outputs on the host
+    verify(blockManagerMaster, 
times(0)).removeExecutor(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+    verify(blockManagerMaster, 
times(1)).removeShufflePushMergerLocation("hostA")
+    verify(mapOutputTracker,
+      times(1)).removeOutputsOnHost("hostA")
+
+    // There should be no map statuses or merge statuses on the host
+    val shuffleStatuses = mapOutputTracker.shuffleStatuses(shuffleId)
+    val mapStatuses = shuffleStatuses.mapStatuses
+    val mergeStatuses = shuffleStatuses.mergeStatuses
+    assert(mapStatuses.count(_ != null) === 1)
+    assert(mapStatuses.count(s => s != null
+      && s.location.executorId == BlockManagerId.SHUFFLE_MERGER_IDENTIFIER) 
=== 0)
+    assert(mergeStatuses.count(s => s != null
+      && s.location.executorId == BlockManagerId.SHUFFLE_MERGER_IDENTIFIER) 
=== 0)
+    // hostB-exec should still have its shuffle files
+    assert(mapStatuses.count(s => s != null && s.location.executorId == 
"hostB-exec") === 1)
+  }
+
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its 
preferred locations.
    * Note that this checks only the host and not the executor ID.
@@ -4402,12 +4501,20 @@ object DAGSchedulerSuite {
   def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2, mapTaskId: 
Long = -1): MapStatus =
     MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes), 
mapTaskId)
 
-  def makeBlockManagerId(host: String): BlockManagerId = {
-    BlockManagerId(host + "-exec", host, 12345)
+  def makeBlockManagerId(host: String, execId: Option[String] = None): 
BlockManagerId = {
+    BlockManagerId(execId.getOrElse(host + "-exec"), host, 12345)
   }
 
-  def makeMergeStatus(host: String, shuffleMergeId: Int, size: Long = 1000): 
MergeStatus =
-    MergeStatus(makeBlockManagerId(host), shuffleMergeId, 
mock(classOf[RoaringBitmap]), size)
+  def makeMergeStatus(host: String, shuffleMergeId: Int, size: Long = 1000,
+    isShufflePushMerger: Boolean = false): MergeStatus = {
+    val execId = if (isShufflePushMerger) {
+      Some(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER)
+    } else {
+      None
+    }
+    MergeStatus(makeBlockManagerId(host, execId),
+      shuffleMergeId, mock(classOf[RoaringBitmap]), size)
+  }
 
   def addMergerLocs(locs: Seq[String]): Unit = {
     locs.foreach { loc => mergerLocs.append(makeBlockManagerId(loc)) }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index e6f05251046..f8fe28c0512 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -1786,4 +1786,32 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
       ShuffleBlockId(0, 5, 2), ShuffleBlockId(0, 6, 2)))
   }
 
+  test("SPARK-38987: failure to fetch corrupted shuffle block chunk should " +
+    "throw a FetchFailedException when early detection is unable to catch 
corruption") {
+    val blockManager = mock(classOf[BlockManager])
+    val localDirs = Array("local-dir")
+    val localHost = "test-local-host"
+    val localBmId = BlockManagerId("test-client", localHost, 1)
+    doReturn(localBmId).when(blockManager).blockManagerId
+    initHostLocalDirManager(blockManager, Map(SHUFFLE_MERGER_IDENTIFIER -> 
localDirs))
+    // Prepare shuffle block chunks
+    val pushMergedBmId = BlockManagerId(SHUFFLE_MERGER_IDENTIFIER, localHost, 
1)
+    val blocksByAddress = Map[BlockManagerId, Seq[(BlockId, Long, Int)]](
+      (localBmId, toBlockList(Seq(ShuffleBlockChunkId(0, 0, 2, 0)), 1L, 1)),
+      (pushMergedBmId, toBlockList(
+        Seq(ShuffleMergedBlockId(0, 0, 2)), 2L, SHUFFLE_PUSH_MAP_ID)))
+
+    val corruptBuffer = createMockManagedBuffer(2)
+    doReturn(Seq({corruptBuffer})).when(blockManager)
+      .getLocalMergedBlockData(ShuffleMergedBlockId(0, 0, 2), localDirs)
+    val corruptStream = mock(classOf[InputStream])
+    when(corruptStream.read(any(), any(), any())).thenThrow(new 
IOException("corrupt"))
+    doReturn(corruptStream).when(corruptBuffer).createInputStream()
+    // Disable corruption detection in the iterator
+    val iterator = createShuffleBlockIteratorWithDefaults(blocksByAddress,
+      blockManager = Some(blockManager), streamWrapperLimitSize = Some(100),
+      detectCorruptUseExtraMemory = false, detectCorrupt = false)
+    intercept[FetchFailedException] { iterator.next() }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to