weixiuli created SPARK-29551: -------------------------------- Summary: There is a bug about fetch failed when an executor lost Key: SPARK-29551 URL: https://issues.apache.org/jira/browse/SPARK-29551 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.4.3 Reporter: weixiuli Fix For: 2.4.3
There will be a regression when the executor lost and then causes 'fetch failed'. We can add an unittest in 'DAGSchedulerSuite.scala' to catch the above problem. {code} test("All shuffle files on the slave should be cleaned up when slave lost test") { // reset the test context with the right shuffle service config afterEach() val conf = new SparkConf() conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true") conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true") init(conf) runEvent(ExecutorAdded("exec-hostA1", "hostA")) runEvent(ExecutorAdded("exec-hostA2", "hostA")) runEvent(ExecutorAdded("exec-hostB", "hostB")) val firstRDD = new MyRDD(sc, 3, Nil) val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3)) val firstShuffleId = firstShuffleDep.shuffleId val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep)) val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3)) val secondShuffleId = shuffleDep.shuffleId val reduceRdd = new MyRDD(sc, 1, List(shuffleDep)) submit(reduceRdd, Array(0)) // map stage1 completes successfully, with one task on each executor complete(taskSets(0), Seq( (Success, MapStatus( BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 5)), (Success, MapStatus( BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 6)), (Success, makeMapStatus("hostB", 1, mapTaskId = 7)) )) // map stage2 completes successfully, with one task on each executor complete(taskSets(1), Seq( (Success, MapStatus( BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 8)), (Success, MapStatus( BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 9)), (Success, makeMapStatus("hostB", 1, mapTaskId = 10)) )) // make sure our test setup is correct val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get assert(initialMapStatus1.count(_ != null) === 3) assert(initialMapStatus1.map{_.location.executorId}.toSet === Set("exec-hostA1", "exec-hostA2", "exec-hostB")) assert(initialMapStatus1.map{_.mapId}.toSet === Set(5, 6, 7)) val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get assert(initialMapStatus2.count(_ != null) === 3) assert(initialMapStatus2.map{_.location.executorId}.toSet === Set("exec-hostA1", "exec-hostA2", "exec-hostB")) assert(initialMapStatus2.map{_.mapId}.toSet === Set(8, 9, 10)) // kill exec-hostA2 runEvent(ExecutorLost("exec-hostA2", ExecutorKilled)) // reduce stage fails with a fetch failure from map stage from exec-hostA2 complete(taskSets(2), Seq( (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), secondShuffleId, 0L, 0, 0, "ignored"), null) )) // Here is the main assertion -- make sure that we de-register // the map outputs for both map stage from both executors on hostA val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses assert(mapStatus1.count(_ != null) === 1) assert(mapStatus1(2).location.executorId === "exec-hostB") assert(mapStatus1(2).location.host === "hostB") val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses assert(mapStatus2.count(_ != null) === 1) assert(mapStatus2(2).location.executorId === "exec-hostB") assert(mapStatus2(2).location.host === "hostB") } {code} The error output is: {code} 3 did not equal 1 ScalaTestFailureLocation: org.apache.spark.scheduler.DAGSchedulerSuite at (DAGSchedulerSuite.scala:609) Expected :1 Actual :3 <Click to see difference> org.scalatest.exceptions.TestFailedException: 3 did not equal 1 {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org