This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b763c23  [SPARK-31643][TEST] Fix flaky 
o.a.s.scheduler.BarrierTaskContextSuite.barrier task killed, interrupt
b763c23 is described below

commit b763c23307a69f018b833add14b7da18fa35df02
Author: yi.wu <yi...@databricks.com>
AuthorDate: Tue May 5 12:36:42 2020 -0700

    [SPARK-31643][TEST] Fix flaky 
o.a.s.scheduler.BarrierTaskContextSuite.barrier task killed, interrupt
    
    ### What changes were proposed in this pull request?
    
    Make sure the task has nearly reached `context.barrier()` before killing.
    
    ### Why are the changes needed?
    
    In case of the task is killed before it reaches `context.barrier()`, the 
task will not create the expected file.
    
    ```
    Error Message
    org.scalatest.exceptions.TestFailedException: new java.io.File(dir, 
killedFlagFile).exists() was false Expect barrier task being killed.
    Stacktrace
    sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: new 
java.io.File(dir, killedFlagFile).exists() was false Expect barrier task being 
killed.
        at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530)
        at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529)
        at 
org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
        at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:503)
        at 
org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$testBarrierTaskKilled$1(BarrierTaskContextSuite.scala:266)
        at 
org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$testBarrierTaskKilled$1$adapted(BarrierTaskContextSuite.scala:226)
        at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:163)
        at 
org.apache.spark.scheduler.BarrierTaskContextSuite.testBarrierTaskKilled(BarrierTaskContextSuite.scala:226)
        at 
org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$new$29(BarrierTaskContextSuite.scala:277)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    ```
    
    
[Here's](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/122273/testReport/org.apache.spark.scheduler/BarrierTaskContextSuite/barrier_task_killed__interrupt/)
 the full error messages.
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #28454 from Ngone51/fix_kill_interrupt.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit 61a6ca5d3f623c2a8b49277ac62d77bf4dbfa84f)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../apache/spark/scheduler/BarrierTaskContextSuite.scala    | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
index 2242d28..c4e5e7c 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
@@ -21,10 +21,13 @@ import java.io.File
 
 import scala.util.Random
 
+import org.scalatest.concurrent.Eventually
+import org.scalatest.time.SpanSugar._
+
 import org.apache.spark._
 import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
 
-class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext {
+class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext 
with Eventually {
 
   def initLocalClusterSparkContext(numWorker: Int = 4): Unit = {
     val conf = new SparkConf()
@@ -224,12 +227,14 @@ class BarrierTaskContextSuite extends SparkFunSuite with 
LocalSparkContext {
 
   def testBarrierTaskKilled(interruptOnKill: Boolean): Unit = {
     withTempDir { dir =>
+      val runningFlagFile = "barrier.task.running"
       val killedFlagFile = "barrier.task.killed"
       val rdd = sc.makeRDD(Seq(0, 1), 2)
       val rdd2 = rdd.barrier().mapPartitions { it =>
         val context = BarrierTaskContext.get()
         if (context.partitionId() == 0) {
           try {
+            new File(dir, runningFlagFile).createNewFile()
             context.barrier()
           } catch {
             case _: TaskKilledException =>
@@ -248,8 +253,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with 
LocalSparkContext {
           if (partitionId == 0) {
             new Thread {
               override def run: Unit = {
-                Thread.sleep(1000)
-                sc.killTaskAttempt(taskStart.taskInfo.taskId, interruptThread 
= interruptOnKill)
+                eventually(timeout(10.seconds)) {
+                  assert(new File(dir, runningFlagFile).exists())
+                  sc.killTaskAttempt(taskStart.taskInfo.taskId, 
interruptThread = interruptOnKill)
+                }
               }
             }.start()
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to