This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 16e2604  Revert "[SPARK-36575][CORE] Should ignore task finished event 
if its task set is gone in TaskSchedulerImpl.handleSuccessfulTask"
16e2604 is described below

commit 16e26049afc8ba92b06bdc58c47b211ea87e0d2b
Author: yi.wu <yi...@databricks.com>
AuthorDate: Wed Nov 10 15:18:05 2021 +0800

    Revert "[SPARK-36575][CORE] Should ignore task finished event if its task 
set is gone in TaskSchedulerImpl.handleSuccessfulTask"
    
    This reverts commit bc80c844fcb37d8d699d46bb34edadb98ed0d9f7.
---
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  8 +-
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 86 +---------------------
 2 files changed, 2 insertions(+), 92 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 282f12b..55db73a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -871,13 +871,7 @@ private[spark] class TaskSchedulerImpl(
       taskSetManager: TaskSetManager,
       tid: Long,
       taskResult: DirectTaskResult[_]): Unit = synchronized {
-    if (taskIdToTaskSetManager.contains(tid)) {
-      taskSetManager.handleSuccessfulTask(tid, taskResult)
-    } else {
-      logInfo(s"Ignoring update with state finished for task (TID $tid) 
because its task set " +
-        "is gone (this is likely the result of receiving duplicate task 
finished status updates)" +
-        " or its executor has been marked as failed.")
-    }
+    taskSetManager.handleSuccessfulTask(tid, taskResult)
   }
 
   def handleFailedTask(
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 551d55d..53dc14c 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -18,12 +18,9 @@
 package org.apache.spark.scheduler
 
 import java.nio.ByteBuffer
-import java.util.Properties
-import java.util.concurrent.{CountDownLatch, ExecutorService, 
LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
 
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.concurrent.duration._
-import scala.language.reflectiveCalls
 
 import org.mockito.ArgumentMatchers.{any, anyInt, anyString, eq => meq}
 import org.mockito.Mockito.{atLeast, atMost, never, spy, times, verify, when}
@@ -37,7 +34,7 @@ import org.apache.spark.internal.config
 import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, 
TaskResourceRequests}
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
-import org.apache.spark.util.{Clock, ManualClock, ThreadUtils}
+import org.apache.spark.util.{Clock, ManualClock}
 
 class FakeSchedulerBackend extends SchedulerBackend {
   def start(): Unit = {}
@@ -1998,87 +1995,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(!normalTSM.runningTasksSet.contains(taskId))
   }
 
-  test("SPARK-36575: Should ignore task finished event if its task set is gone 
" +
-    "in TaskSchedulerImpl.handleSuccessfulTask") {
-    val taskScheduler = setupScheduler()
-
-    val latch = new CountDownLatch(2)
-    val resultGetter = new TaskResultGetter(sc.env, taskScheduler) {
-      override protected val getTaskResultExecutor: ExecutorService =
-        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue[Runnable],
-          ThreadUtils.namedThreadFactory("task-result-getter")) {
-          override def execute(command: Runnable): Unit = {
-            super.execute(new Runnable {
-              override def run(): Unit = {
-                command.run()
-                latch.countDown()
-              }
-            })
-          }
-        }
-      def taskResultExecutor() : ExecutorService = getTaskResultExecutor
-    }
-    taskScheduler.taskResultGetter = resultGetter
-
-    val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 1),
-      new WorkerOffer("executor1", "host1", 1))
-    val task1 = new ShuffleMapTask(1, 0, null, new Partition {
-      override def index: Int = 0
-    }, Seq(TaskLocation("host0", "executor0")), new Properties, null)
-
-    val task2 = new ShuffleMapTask(1, 0, null, new Partition {
-      override def index: Int = 1
-    }, Seq(TaskLocation("host1", "executor1")), new Properties, null)
-
-    val taskSet = new TaskSet(Array(task1, task2), 0, 0, 0, null, 0)
-
-    taskScheduler.submitTasks(taskSet)
-    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
-    assert(2 === taskDescriptions.length)
-
-    val ser = sc.env.serializer.newInstance()
-    val directResult = new DirectTaskResult[Int](ser.serialize(1), Seq(), 
Array.empty)
-    val resultBytes = ser.serialize(directResult)
-
-    val busyTask = new Runnable {
-      val lock : Object = new Object
-      override def run(): Unit = {
-        lock.synchronized {
-          lock.wait()
-        }
-      }
-      def markTaskDone: Unit = {
-        lock.synchronized {
-          lock.notify()
-        }
-      }
-    }
-    // make getTaskResultExecutor busy
-    resultGetter.taskResultExecutor().submit(busyTask)
-
-    // task1 finished
-    val tid = taskDescriptions(0).taskId
-    taskScheduler.statusUpdate(
-      tid = tid,
-      state = TaskState.FINISHED,
-      serializedData = resultBytes
-    )
-
-    // mark executor heartbeat timed out
-    taskScheduler.executorLost(taskDescriptions(0).executorId, 
ExecutorProcessLost("Executor " +
-      "heartbeat timed out"))
-
-    busyTask.markTaskDone
-
-    // Wait until all events are processed
-    latch.await()
-
-    val taskSetManager = 
taskScheduler.taskIdToTaskSetManager.get(taskDescriptions(1).taskId)
-    assert(taskSetManager != null)
-    assert(0 == taskSetManager.tasksSuccessful)
-    assert(!taskSetManager.successful(taskDescriptions(0).index))
-  }
-
   /**
    * Used by tests to simulate a task failure. This calls the failure handler 
explicitly, to ensure
    * that all the state is updated when this method returns. Otherwise, 
there's no way to know when

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to