This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 0bd2dab [SPARK-37594][SPARK-34399][SQL][TESTS] Make UT more stable 0bd2dab is described below commit 0bd2dab53baaf1ad19a4389e5bcbeb388693d11f Author: Angerszhuuuu <angers....@gmail.com> AuthorDate: Fri Dec 10 10:53:31 2021 -0600 [SPARK-37594][SPARK-34399][SQL][TESTS] Make UT more stable ### What changes were proposed in this pull request? There are some GA test failed caused by UT ` test("SPARK-34399: Add job commit duration metrics for DataWritingCommand") ` such as ``` sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 0 was not greater than 0 at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) at org.apache.spark.sql.execution.metric.SQLMetricsSuite.$anonfun$new$87(SQLMetricsSuite.scala:810) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:305) at org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:303) at org.apache.spark.sql.execution.metric.SQLMetricsSuite.withTable(SQLMetricsSuite.scala:44) at org.apache.spark.sql.execution.metric.SQLMetricsSuite.$anonfun$new$86(SQLMetricsSuite.scala:800) at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54) at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38) at org.apache.spark.sql.execution.metric.SQLMetricsSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(SQLMetricsSuite.scala:44) at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:246) at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:244) at org.apache.spark.sql.execution.metric.SQLMetricsSuite.withSQLConf(SQLMetricsSuite.scala:44) at org.apache.spark.sql.execution.metric.SQLMetricsSuite.$anonfun$new$85(SQLMetricsSuite.scala:800) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite.$anonfun$test$5(AdaptiveTestUtils.scala:65) at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54) at org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38) at org.apache.spark.sql.execution.metric.SQLMetricsSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(SQLMetricsSuite.scala:44) at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:246) at org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:244) at org.apache.spark.sql.execution.metric.SQLMetricsSuite.withSQLConf(SQLMetricsSuite.scala:44) at org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite.$anonfun$test$4(AdaptiveTestUtils.scala:65) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:190) at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:62) at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:62) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) ``` This pr to add a certain job commit delay and task commit delay to make it more stable. ### Why are the changes needed? Make unit test more stable. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existed UT Closes #34847 from AngersZhuuuu/SPARK-37594. Authored-by: Angerszhuuuu <angers....@gmail.com> Signed-off-by: Sean Owen <sro...@gmail.com> (cherry picked from commit 471a5b55b80256ccd253c93623ff363add5f1985) Signed-off-by: Sean Owen <sro...@gmail.com> --- .../spark/sql/execution/metric/SQLMetricsSuite.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 32428fb..a51003e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -22,7 +22,7 @@ import java.io.File import scala.reflect.{classTag, ClassTag} import scala.util.Random -import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql._ @@ -803,6 +803,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils val insert = df.queryExecution.executedPlan.collect { case CommandResultExec(_, dataWriting: DataWritingCommandExec, _) => dataWriting.cmd } + sparkContext.listenerBus.waitUntilEmpty() assert(insert.size == 1) assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.JOB_COMMIT_TIME)) assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.TASK_COMMIT_TIME)) @@ -836,8 +837,15 @@ case class CustomFileCommitProtocol( dynamicPartitionOverwrite: Boolean = false) extends SQLHadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite) { override def commitTask( - taskContext: TaskAttemptContext): FileCommitProtocol.TaskCommitMessage = { - Thread.sleep(Random.nextInt(100)) + taskContext: TaskAttemptContext): FileCommitProtocol.TaskCommitMessage = { + Thread.sleep(100) super.commitTask(taskContext) } + + override def commitJob( + jobContext: JobContext, + taskCommits: Seq[FileCommitProtocol.TaskCommitMessage]): Unit = { + Thread.sleep(100) + super.commitJob(jobContext, taskCommits) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org