This is an automated email from the ASF dual-hosted git repository. wuyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 16e2604 Revert "[SPARK-36575][CORE] Should ignore task finished event if its task set is gone in TaskSchedulerImpl.handleSuccessfulTask" 16e2604 is described below commit 16e26049afc8ba92b06bdc58c47b211ea87e0d2b Author: yi.wu <yi...@databricks.com> AuthorDate: Wed Nov 10 15:18:05 2021 +0800 Revert "[SPARK-36575][CORE] Should ignore task finished event if its task set is gone in TaskSchedulerImpl.handleSuccessfulTask" This reverts commit bc80c844fcb37d8d699d46bb34edadb98ed0d9f7. --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 8 +- .../spark/scheduler/TaskSchedulerImplSuite.scala | 86 +--------------------- 2 files changed, 2 insertions(+), 92 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 282f12b..55db73a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -871,13 +871,7 @@ private[spark] class TaskSchedulerImpl( taskSetManager: TaskSetManager, tid: Long, taskResult: DirectTaskResult[_]): Unit = synchronized { - if (taskIdToTaskSetManager.contains(tid)) { - taskSetManager.handleSuccessfulTask(tid, taskResult) - } else { - logInfo(s"Ignoring update with state finished for task (TID $tid) because its task set " + - "is gone (this is likely the result of receiving duplicate task finished status updates)" + - " or its executor has been marked as failed.") - } + taskSetManager.handleSuccessfulTask(tid, taskResult) } def handleFailedTask( diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 551d55d..53dc14c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -18,12 +18,9 @@ package org.apache.spark.scheduler import java.nio.ByteBuffer -import java.util.Properties -import java.util.concurrent.{CountDownLatch, ExecutorService, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.duration._ -import scala.language.reflectiveCalls import org.mockito.ArgumentMatchers.{any, anyInt, anyString, eq => meq} import org.mockito.Mockito.{atLeast, atMost, never, spy, times, verify, when} @@ -37,7 +34,7 @@ import org.apache.spark.internal.config import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceRequests} import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ -import org.apache.spark.util.{Clock, ManualClock, ThreadUtils} +import org.apache.spark.util.{Clock, ManualClock} class FakeSchedulerBackend extends SchedulerBackend { def start(): Unit = {} @@ -1998,87 +1995,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B assert(!normalTSM.runningTasksSet.contains(taskId)) } - test("SPARK-36575: Should ignore task finished event if its task set is gone " + - "in TaskSchedulerImpl.handleSuccessfulTask") { - val taskScheduler = setupScheduler() - - val latch = new CountDownLatch(2) - val resultGetter = new TaskResultGetter(sc.env, taskScheduler) { - override protected val getTaskResultExecutor: ExecutorService = - new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable], - ThreadUtils.namedThreadFactory("task-result-getter")) { - override def execute(command: Runnable): Unit = { - super.execute(new Runnable { - override def run(): Unit = { - command.run() - latch.countDown() - } - }) - } - } - def taskResultExecutor() : ExecutorService = getTaskResultExecutor - } - taskScheduler.taskResultGetter = resultGetter - - val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1), - new WorkerOffer("executor1", "host1", 1)) - val task1 = new ShuffleMapTask(1, 0, null, new Partition { - override def index: Int = 0 - }, Seq(TaskLocation("host0", "executor0")), new Properties, null) - - val task2 = new ShuffleMapTask(1, 0, null, new Partition { - override def index: Int = 1 - }, Seq(TaskLocation("host1", "executor1")), new Properties, null) - - val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0) - - taskScheduler.submitTasks(taskSet) - val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten - assert(2 === taskDescriptions.length) - - val ser = sc.env.serializer.newInstance() - val directResult = new DirectTaskResult[Int](ser.serialize(1), Seq(), Array.empty) - val resultBytes = ser.serialize(directResult) - - val busyTask = new Runnable { - val lock : Object = new Object - override def run(): Unit = { - lock.synchronized { - lock.wait() - } - } - def markTaskDone: Unit = { - lock.synchronized { - lock.notify() - } - } - } - // make getTaskResultExecutor busy - resultGetter.taskResultExecutor().submit(busyTask) - - // task1 finished - val tid = taskDescriptions(0).taskId - taskScheduler.statusUpdate( - tid = tid, - state = TaskState.FINISHED, - serializedData = resultBytes - ) - - // mark executor heartbeat timed out - taskScheduler.executorLost(taskDescriptions(0).executorId, ExecutorProcessLost("Executor " + - "heartbeat timed out")) - - busyTask.markTaskDone - - // Wait until all events are processed - latch.await() - - val taskSetManager = taskScheduler.taskIdToTaskSetManager.get(taskDescriptions(1).taskId) - assert(taskSetManager != null) - assert(0 == taskSetManager.tasksSuccessful) - assert(!taskSetManager.successful(taskDescriptions(0).index)) - } - /** * Used by tests to simulate a task failure. This calls the failure handler explicitly, to ensure * that all the state is updated when this method returns. Otherwise, there's no way to know when --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org