This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new b763c23 [SPARK-31643][TEST] Fix flaky o.a.s.scheduler.BarrierTaskContextSuite.barrier task killed, interrupt b763c23 is described below commit b763c23307a69f018b833add14b7da18fa35df02 Author: yi.wu <yi...@databricks.com> AuthorDate: Tue May 5 12:36:42 2020 -0700 [SPARK-31643][TEST] Fix flaky o.a.s.scheduler.BarrierTaskContextSuite.barrier task killed, interrupt ### What changes were proposed in this pull request? Make sure the task has nearly reached `context.barrier()` before killing. ### Why are the changes needed? In case of the task is killed before it reaches `context.barrier()`, the task will not create the expected file. ``` Error Message org.scalatest.exceptions.TestFailedException: new java.io.File(dir, killedFlagFile).exists() was false Expect barrier task being killed. Stacktrace sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: new java.io.File(dir, killedFlagFile).exists() was false Expect barrier task being killed. at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560) at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:503) at org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$testBarrierTaskKilled$1(BarrierTaskContextSuite.scala:266) at org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$testBarrierTaskKilled$1$adapted(BarrierTaskContextSuite.scala:226) at org.apache.spark.SparkFunSuite.withTempDir(SparkFunSuite.scala:163) at org.apache.spark.scheduler.BarrierTaskContextSuite.testBarrierTaskKilled(BarrierTaskContextSuite.scala:226) at org.apache.spark.scheduler.BarrierTaskContextSuite.$anonfun$new$29(BarrierTaskContextSuite.scala:277) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) ``` [Here's](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/122273/testReport/org.apache.spark.scheduler/BarrierTaskContextSuite/barrier_task_killed__interrupt/) the full error messages. ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #28454 from Ngone51/fix_kill_interrupt. Authored-by: yi.wu <yi...@databricks.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 61a6ca5d3f623c2a8b49277ac62d77bf4dbfa84f) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../apache/spark/scheduler/BarrierTaskContextSuite.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala index 2242d28..c4e5e7c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala @@ -21,10 +21,13 @@ import java.io.File import scala.util.Random +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar._ + import org.apache.spark._ import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY -class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { +class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with Eventually { def initLocalClusterSparkContext(numWorker: Int = 4): Unit = { val conf = new SparkConf() @@ -224,12 +227,14 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { def testBarrierTaskKilled(interruptOnKill: Boolean): Unit = { withTempDir { dir => + val runningFlagFile = "barrier.task.running" val killedFlagFile = "barrier.task.killed" val rdd = sc.makeRDD(Seq(0, 1), 2) val rdd2 = rdd.barrier().mapPartitions { it => val context = BarrierTaskContext.get() if (context.partitionId() == 0) { try { + new File(dir, runningFlagFile).createNewFile() context.barrier() } catch { case _: TaskKilledException => @@ -248,8 +253,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { if (partitionId == 0) { new Thread { override def run: Unit = { - Thread.sleep(1000) - sc.killTaskAttempt(taskStart.taskInfo.taskId, interruptThread = interruptOnKill) + eventually(timeout(10.seconds)) { + assert(new File(dir, runningFlagFile).exists()) + sc.killTaskAttempt(taskStart.taskInfo.taskId, interruptThread = interruptOnKill) + } } }.start() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org