This is an automated email from the ASF dual-hosted git repository.
wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 16e2604 Revert "[SPARK-36575][CORE] Should ignore task finished event
if its task set is gone in TaskSchedulerImpl.handleSuccessfulTask"
16e2604 is described below
commit 16e26049afc8ba92b06bdc58c47b211ea87e0d2b
Author: yi.wu <[email protected]>
AuthorDate: Wed Nov 10 15:18:05 2021 +0800
Revert "[SPARK-36575][CORE] Should ignore task finished event if its task
set is gone in TaskSchedulerImpl.handleSuccessfulTask"
This reverts commit bc80c844fcb37d8d699d46bb34edadb98ed0d9f7.
---
.../apache/spark/scheduler/TaskSchedulerImpl.scala | 8 +-
.../spark/scheduler/TaskSchedulerImplSuite.scala | 86 +---------------------
2 files changed, 2 insertions(+), 92 deletions(-)
diff --git
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 282f12b..55db73a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -871,13 +871,7 @@ private[spark] class TaskSchedulerImpl(
taskSetManager: TaskSetManager,
tid: Long,
taskResult: DirectTaskResult[_]): Unit = synchronized {
- if (taskIdToTaskSetManager.contains(tid)) {
- taskSetManager.handleSuccessfulTask(tid, taskResult)
- } else {
- logInfo(s"Ignoring update with state finished for task (TID $tid)
because its task set " +
- "is gone (this is likely the result of receiving duplicate task
finished status updates)" +
- " or its executor has been marked as failed.")
- }
+ taskSetManager.handleSuccessfulTask(tid, taskResult)
}
def handleFailedTask(
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 551d55d..53dc14c 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -18,12 +18,9 @@
package org.apache.spark.scheduler
import java.nio.ByteBuffer
-import java.util.Properties
-import java.util.concurrent.{CountDownLatch, ExecutorService,
LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.duration._
-import scala.language.reflectiveCalls
import org.mockito.ArgumentMatchers.{any, anyInt, anyString, eq => meq}
import org.mockito.Mockito.{atLeast, atMost, never, spy, times, verify, when}
@@ -37,7 +34,7 @@ import org.apache.spark.internal.config
import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile,
TaskResourceRequests}
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.resource.TestResourceIDs._
-import org.apache.spark.util.{Clock, ManualClock, ThreadUtils}
+import org.apache.spark.util.{Clock, ManualClock}
class FakeSchedulerBackend extends SchedulerBackend {
def start(): Unit = {}
@@ -1998,87 +1995,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with
LocalSparkContext with B
assert(!normalTSM.runningTasksSet.contains(taskId))
}
- test("SPARK-36575: Should ignore task finished event if its task set is gone
" +
- "in TaskSchedulerImpl.handleSuccessfulTask") {
- val taskScheduler = setupScheduler()
-
- val latch = new CountDownLatch(2)
- val resultGetter = new TaskResultGetter(sc.env, taskScheduler) {
- override protected val getTaskResultExecutor: ExecutorService =
- new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new
LinkedBlockingQueue[Runnable],
- ThreadUtils.namedThreadFactory("task-result-getter")) {
- override def execute(command: Runnable): Unit = {
- super.execute(new Runnable {
- override def run(): Unit = {
- command.run()
- latch.countDown()
- }
- })
- }
- }
- def taskResultExecutor() : ExecutorService = getTaskResultExecutor
- }
- taskScheduler.taskResultGetter = resultGetter
-
- val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
- new WorkerOffer("executor1", "host1", 1))
- val task1 = new ShuffleMapTask(1, 0, null, new Partition {
- override def index: Int = 0
- }, Seq(TaskLocation("host0", "executor0")), new Properties, null)
-
- val task2 = new ShuffleMapTask(1, 0, null, new Partition {
- override def index: Int = 1
- }, Seq(TaskLocation("host1", "executor1")), new Properties, null)
-
- val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0)
-
- taskScheduler.submitTasks(taskSet)
- val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
- assert(2 === taskDescriptions.length)
-
- val ser = sc.env.serializer.newInstance()
- val directResult = new DirectTaskResult[Int](ser.serialize(1), Seq(),
Array.empty)
- val resultBytes = ser.serialize(directResult)
-
- val busyTask = new Runnable {
- val lock : Object = new Object
- override def run(): Unit = {
- lock.synchronized {
- lock.wait()
- }
- }
- def markTaskDone: Unit = {
- lock.synchronized {
- lock.notify()
- }
- }
- }
- // make getTaskResultExecutor busy
- resultGetter.taskResultExecutor().submit(busyTask)
-
- // task1 finished
- val tid = taskDescriptions(0).taskId
- taskScheduler.statusUpdate(
- tid = tid,
- state = TaskState.FINISHED,
- serializedData = resultBytes
- )
-
- // mark executor heartbeat timed out
- taskScheduler.executorLost(taskDescriptions(0).executorId,
ExecutorProcessLost("Executor " +
- "heartbeat timed out"))
-
- busyTask.markTaskDone
-
- // Wait until all events are processed
- latch.await()
-
- val taskSetManager =
taskScheduler.taskIdToTaskSetManager.get(taskDescriptions(1).taskId)
- assert(taskSetManager != null)
- assert(0 == taskSetManager.tasksSuccessful)
- assert(!taskSetManager.successful(taskDescriptions(0).index))
- }
-
/**
* Used by tests to simulate a task failure. This calls the failure handler
explicitly, to ensure
* that all the state is updated when this method returns. Otherwise,
there's no way to know when
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]