This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 0bd2dab [SPARK-37594][SPARK-34399][SQL][TESTS] Make UT more stable
0bd2dab is described below
commit 0bd2dab53baaf1ad19a4389e5bcbeb388693d11f
Author: Angerszhuuuu <[email protected]>
AuthorDate: Fri Dec 10 10:53:31 2021 -0600
[SPARK-37594][SPARK-34399][SQL][TESTS] Make UT more stable
### What changes were proposed in this pull request?
There are some GA test failed caused by UT ` test("SPARK-34399: Add job
commit duration metrics for DataWritingCommand") ` such as
```
sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 0 was
not greater than 0
at
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
at
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
at
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
at
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
at
org.apache.spark.sql.execution.metric.SQLMetricsSuite.$anonfun$new$87(SQLMetricsSuite.scala:810)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at
org.apache.spark.sql.test.SQLTestUtilsBase.withTable(SQLTestUtils.scala:305)
at
org.apache.spark.sql.test.SQLTestUtilsBase.withTable$(SQLTestUtils.scala:303)
at
org.apache.spark.sql.execution.metric.SQLMetricsSuite.withTable(SQLMetricsSuite.scala:44)
at
org.apache.spark.sql.execution.metric.SQLMetricsSuite.$anonfun$new$86(SQLMetricsSuite.scala:800)
at
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
at
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
at
org.apache.spark.sql.execution.metric.SQLMetricsSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(SQLMetricsSuite.scala:44)
at
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:246)
at
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:244)
at
org.apache.spark.sql.execution.metric.SQLMetricsSuite.withSQLConf(SQLMetricsSuite.scala:44)
at
org.apache.spark.sql.execution.metric.SQLMetricsSuite.$anonfun$new$85(SQLMetricsSuite.scala:800)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at
org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite.$anonfun$test$5(AdaptiveTestUtils.scala:65)
at
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf(SQLHelper.scala:54)
at
org.apache.spark.sql.catalyst.plans.SQLHelper.withSQLConf$(SQLHelper.scala:38)
at
org.apache.spark.sql.execution.metric.SQLMetricsSuite.org$apache$spark$sql$test$SQLTestUtilsBase$$super$withSQLConf(SQLMetricsSuite.scala:44)
at
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf(SQLTestUtils.scala:246)
at
org.apache.spark.sql.test.SQLTestUtilsBase.withSQLConf$(SQLTestUtils.scala:244)
at
org.apache.spark.sql.execution.metric.SQLMetricsSuite.withSQLConf(SQLMetricsSuite.scala:44)
at
org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite.$anonfun$test$4(AdaptiveTestUtils.scala:65)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:190)
at
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at
org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
at
org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
at
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:62)
at
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
at
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:62)
at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
```
This pr to add a certain job commit delay and task commit delay to make it
more stable.
### Why are the changes needed?
Make unit test more stable.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existed UT
Closes #34847 from AngersZhuuuu/SPARK-37594.
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit 471a5b55b80256ccd253c93623ff363add5f1985)
Signed-off-by: Sean Owen <[email protected]>
---
.../spark/sql/execution/metric/SQLMetricsSuite.scala | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 32428fb..a51003e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -22,7 +22,7 @@ import java.io.File
import scala.reflect.{classTag, ClassTag}
import scala.util.Random
-import org.apache.hadoop.mapreduce.TaskAttemptContext
+import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
@@ -803,6 +803,7 @@ class SQLMetricsSuite extends SharedSparkSession with
SQLMetricsTestUtils
val insert = df.queryExecution.executedPlan.collect {
case CommandResultExec(_, dataWriting: DataWritingCommandExec, _) =>
dataWriting.cmd
}
+ sparkContext.listenerBus.waitUntilEmpty()
assert(insert.size == 1)
assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.JOB_COMMIT_TIME))
assert(insert.head.metrics.contains(BasicWriteJobStatsTracker.TASK_COMMIT_TIME))
@@ -836,8 +837,15 @@ case class CustomFileCommitProtocol(
dynamicPartitionOverwrite: Boolean = false)
extends SQLHadoopMapReduceCommitProtocol(jobId, path,
dynamicPartitionOverwrite) {
override def commitTask(
- taskContext: TaskAttemptContext): FileCommitProtocol.TaskCommitMessage = {
- Thread.sleep(Random.nextInt(100))
+ taskContext: TaskAttemptContext): FileCommitProtocol.TaskCommitMessage =
{
+ Thread.sleep(100)
super.commitTask(taskContext)
}
+
+ override def commitJob(
+ jobContext: JobContext,
+ taskCommits: Seq[FileCommitProtocol.TaskCommitMessage]): Unit = {
+ Thread.sleep(100)
+ super.commitJob(jobContext, taskCommits)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]