This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 0d26cfc [SPARK-27460][TESTS][2.4] Running slowest test suites in
their own forked JVMs for higher parallelism
0d26cfc is described below
commit 0d26cfcecd451a42a8d3eb14e6d66a88e955ce9e
Author: Gengliang Wang <[email protected]>
AuthorDate: Thu Sep 19 21:03:24 2019 -0700
[SPARK-27460][TESTS][2.4] Running slowest test suites in their own forked
JVMs for higher parallelism
## What changes were proposed in this pull request?
This is a backport of https://github.com/apache/spark/pull/24373 ,
https://github.com/apache/spark/pull/24404 and
https://github.com/apache/spark/pull/24434
This patch modifies SparkBuild so that the largest / slowest test suites
(or collections of suites) can run in their own forked JVMs, allowing them to
be run in parallel with each other. This opt-in / whitelisting approach allows
us to increase parallelism without having to fix a long-tail of flakiness /
brittleness issues in tests which aren't performance bottlenecks.
See comments in SparkBuild.scala for information on the details, including
a summary of why we sometimes opt to run entire groups of tests in a single
forked JVM .
The time of full new pull request test in Jenkins is reduced by around 53%:
before changes: 4hr 40min
after changes: 2hr 13min
## How was this patch tested?
Unit test
Closes #25861 from dongjoon-hyun/SPARK-27460.
Lead-authored-by: Gengliang Wang <[email protected]>
Co-authored-by: gatorsmile <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/ExecutorAllocationManagerSuite.scala | 18 ++--
.../org/apache/spark/SparkContextInfoSuite.scala | 9 +-
.../scala/org/apache/spark/SparkFunSuite.scala | 46 ++++++++++-
.../org/apache/spark/StatusTrackerSuite.scala | 2 +-
.../deploy/history/FsHistoryProviderSuite.scala | 20 +++--
.../scheduler/SparkListenerWithClusterSuite.scala | 10 +--
project/SparkBuild.scala | 95 +++++++++++++++++++++-
.../spark/deploy/yarn/BaseYarnClusterSuite.scala | 2 +-
.../spark/deploy/yarn/YarnClusterSuite.scala | 4 +-
.../org/apache/spark/sql/SQLQueryTestSuite.scala | 5 +-
.../execution/ui/SQLAppStatusListenerSuite.scala | 3 +-
.../sql/streaming/FileStreamSourceSuite.scala | 2 +-
.../apache/spark/sql/streaming/StreamTest.scala | 2 +-
.../sql/streaming/StreamingQueryManagerSuite.scala | 17 ++--
.../apache/spark/sql/test/SharedSparkSession.scala | 7 +-
.../spark/sql/hive/client/HiveClientSuite.scala | 2 +
.../org/apache/spark/streaming/ReceiverSuite.scala | 2 +-
17 files changed, 202 insertions(+), 44 deletions(-)
diff --git
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index a69045f..df5d265 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable
import org.mockito.Matchers.{any, eq => meq}
import org.mockito.Mockito.{mock, never, verify, when}
-import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
+import org.scalatest.PrivateMethodTester
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.config
@@ -37,20 +37,24 @@ import org.apache.spark.util.ManualClock
*/
class ExecutorAllocationManagerSuite
extends SparkFunSuite
- with LocalSparkContext
- with BeforeAndAfter {
+ with LocalSparkContext {
import ExecutorAllocationManager._
import ExecutorAllocationManagerSuite._
private val contexts = new mutable.ListBuffer[SparkContext]()
- before {
+ override def beforeEach(): Unit = {
+ super.beforeEach()
contexts.clear()
}
- after {
- contexts.foreach(_.stop())
+ override def afterEach(): Unit = {
+ try {
+ contexts.foreach(_.stop())
+ } finally {
+ super.afterEach()
+ }
}
private def post(bus: LiveListenerBus, event: SparkListenerEvent): Unit = {
@@ -281,7 +285,7 @@ class ExecutorAllocationManagerSuite
assert(totalRunningTasks(manager) === 0)
}
- test("cancel pending executors when no longer needed") {
+ testRetry("cancel pending executors when no longer needed") {
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5)))
diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
index 051a13c..c45f104 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
@@ -17,7 +17,10 @@
package org.apache.spark
+import scala.concurrent.duration._
+
import org.scalatest.Assertions
+import org.scalatest.concurrent.Eventually._
import org.apache.spark.storage.StorageLevel
@@ -58,10 +61,12 @@ class SparkContextInfoSuite extends SparkFunSuite with
LocalSparkContext {
test("getRDDStorageInfo only reports on RDDs that actually persist data") {
sc = new SparkContext("local", "test")
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
- assert(sc.getRDDStorageInfo.size === 0)
+ assert(sc.getRDDStorageInfo.length === 0)
rdd.collect()
sc.listenerBus.waitUntilEmpty(10000)
- assert(sc.getRDDStorageInfo.size === 1)
+ eventually(timeout(10.seconds), interval(100.milliseconds)) {
+ assert(sc.getRDDStorageInfo.length === 1)
+ }
assert(sc.getRDDStorageInfo.head.isCached)
assert(sc.getRDDStorageInfo.head.memSize > 0)
assert(sc.getRDDStorageInfo.head.storageLevel === StorageLevel.MEMORY_ONLY)
diff --git a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
index 3128902..ffb679f 100644
--- a/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkFunSuite.scala
@@ -20,7 +20,9 @@ package org.apache.spark
// scalastyle:off
import java.io.File
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Outcome}
+import scala.annotation.tailrec
+
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, BeforeAndAfterEach,
FunSuite, Outcome}
import org.apache.spark.internal.Logging
import org.apache.spark.util.AccumulatorContext
@@ -52,6 +54,7 @@ import org.apache.spark.util.AccumulatorContext
abstract class SparkFunSuite
extends FunSuite
with BeforeAndAfterAll
+ with BeforeAndAfterEach
with ThreadAudit
with Logging {
// scalastyle:on
@@ -88,6 +91,47 @@ abstract class SparkFunSuite
}
/**
+ * Note: this method doesn't support `BeforeAndAfter`. You must use
`BeforeAndAfterEach` to
+ * set up and tear down resources.
+ */
+ def testRetry(s: String, n: Int = 2)(body: => Unit): Unit = {
+ test(s) {
+ retry(n) {
+ body
+ }
+ }
+ }
+
+ /**
+ * Note: this method doesn't support `BeforeAndAfter`. You must use
`BeforeAndAfterEach` to
+ * set up and tear down resources.
+ */
+ def retry[T](n: Int)(body: => T): T = {
+ if (this.isInstanceOf[BeforeAndAfter]) {
+ throw new UnsupportedOperationException(
+ s"testRetry/retry cannot be used with ${classOf[BeforeAndAfter]}. " +
+ s"Please use ${classOf[BeforeAndAfterEach]} instead.")
+ }
+ retry0(n, n)(body)
+ }
+
+ @tailrec private final def retry0[T](n: Int, n0: Int)(body: => T): T = {
+ try body
+ catch { case e: Throwable =>
+ if (n > 0) {
+ logWarning(e.getMessage, e)
+ logInfo(s"\n\n===== RETRY #${n0 - n + 1} =====\n")
+ // Reset state before re-attempting in order so that tests which use
patterns like
+ // LocalSparkContext to clean up state can work correctly when retried.
+ afterEach()
+ beforeEach()
+ retry0(n-1, n0)(body)
+ }
+ else throw e
+ }
+ }
+
+ /**
* Log the suite name and the test name before and after each test.
*
* Subclasses should never override this method. If they wish to run
diff --git a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
index a15ae04..75812ae 100644
--- a/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/StatusTrackerSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.JobExecutionStatus._
class StatusTrackerSuite extends SparkFunSuite with Matchers with
LocalSparkContext {
- test("basic status API usage") {
+ testRetry("basic status API usage") {
sc = new SparkContext("local", "test", new SparkConf(false))
val jobFuture = sc.parallelize(1 to 10000,
2).map(identity).groupBy(identity).collectAsync()
val jobId: Int = eventually(timeout(10 seconds)) {
diff --git
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 6aad00b..dc15da5 100644
---
a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++
b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -34,7 +34,6 @@ import org.json4s.jackson.JsonMethods._
import org.mockito.ArgumentMatcher
import org.mockito.Matchers.{any, argThat}
import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
-import org.scalatest.BeforeAndAfter
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
@@ -48,16 +47,21 @@ import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils}
-class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with
Matchers with Logging {
+class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
private var testDir: File = null
- before {
+ override def beforeEach(): Unit = {
+ super.beforeEach()
testDir = Utils.createTempDir(namePrefix = s"a b%20c+d")
}
- after {
- Utils.deleteRecursively(testDir)
+ override def afterEach(): Unit = {
+ try {
+ Utils.deleteRecursively(testDir)
+ } finally {
+ super.afterEach()
+ }
}
/** Create a fake log file using the new log format used in Spark 1.3+ */
@@ -487,7 +491,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with
BeforeAndAfter with Matc
provider.inSafeMode = false
clock.setTime(10000)
- eventually(timeout(1 second), interval(10 millis)) {
+ eventually(timeout(3.second), interval(10.milliseconds)) {
provider.getConfig().keys should not contain ("HDFS State")
}
} finally {
@@ -495,7 +499,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with
BeforeAndAfter with Matc
}
}
- test("provider reports error after FS leaves safe mode") {
+ testRetry("provider reports error after FS leaves safe mode") {
testDir.delete()
val clock = new ManualClock()
val provider = new SafeModeTestProvider(createTestConf(), clock)
@@ -505,7 +509,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with
BeforeAndAfter with Matc
provider.inSafeMode = false
clock.setTime(10000)
- eventually(timeout(1 second), interval(10 millis)) {
+ eventually(timeout(3.second), interval(10.milliseconds)) {
verify(errorHandler).uncaughtException(any(), any())
}
} finally {
diff --git
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
index 123f7f4..a6576e0 100644
---
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
+++
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerWithClusterSuite.scala
@@ -19,25 +19,23 @@ package org.apache.spark.scheduler
import scala.collection.mutable
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
-
import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite,
TestUtils}
import org.apache.spark.scheduler.cluster.ExecutorInfo
/**
* Unit tests for SparkListener that require a local cluster.
*/
-class SparkListenerWithClusterSuite extends SparkFunSuite with
LocalSparkContext
- with BeforeAndAfter with BeforeAndAfterAll {
+class SparkListenerWithClusterSuite extends SparkFunSuite with
LocalSparkContext {
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
- before {
+ override def beforeEach(): Unit = {
+ super.beforeEach()
sc = new SparkContext("local-cluster[2,1,1024]", "SparkListenerSuite")
}
- test("SparkListener sends executor added message") {
+ testRetry("SparkListener sends executor added message") {
val listener = new SaveExecutorInfo
sc.addSparkListener(listener)
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 56b27fa..1edbe17 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -430,6 +430,84 @@ object SparkBuild extends PomBuild {
else x.settings(Seq[Setting[_]](): _*)
} ++ Seq[Project](OldDeps.project)
}
+
+ if (!sys.env.contains("SERIAL_SBT_TESTS")) {
+ allProjects.foreach(enable(SparkParallelTestGrouping.settings))
+ }
+}
+
+object SparkParallelTestGrouping {
+ // Settings for parallelizing tests. The basic strategy here is to run the
slowest suites (or
+ // collections of suites) in their own forked JVMs, allowing us to gain
parallelism within a
+ // SBT project. Here, we take a whitelisting approach where the default
behavior is to run all
+ // tests sequentially in a single JVM, requiring us to manually opt-in to
the extra parallelism.
+ //
+ // There are a reasons why such a whitelist approach is good:
+ //
+ // 1. Launching one JVM per suite adds significant overhead for
short-running suites. In
+ // addition to JVM startup time and JIT warmup, it appears that
initialization of Derby
+ // metastores can be very slow so creating a fresh warehouse per suite
is inefficient.
+ //
+ // 2. When parallelizing within a project we need to give each forked JVM
a different tmpdir
+ // so that the metastore warehouses do not collide. Unfortunately, it
seems that there are
+ // some tests which have an overly tight dependency on the default
tmpdir, so those fragile
+ // tests need to continue re-running in the default configuration (or
need to be rewritten).
+ // Fixing that problem would be a huge amount of work for limited
payoff in most cases
+ // because most test suites are short-running.
+ //
+
+ private val testsWhichShouldRunInTheirOwnDedicatedJvm = Set(
+ "org.apache.spark.DistributedSuite",
+ "org.apache.spark.sql.catalyst.expressions.DateExpressionsSuite",
+ "org.apache.spark.sql.catalyst.expressions.HashExpressionsSuite",
+ "org.apache.spark.sql.catalyst.expressions.CastSuite",
+ "org.apache.spark.sql.catalyst.expressions.MathExpressionsSuite",
+ "org.apache.spark.sql.hive.HiveExternalCatalogSuite",
+ "org.apache.spark.sql.hive.StatisticsSuite",
+ "org.apache.spark.sql.hive.execution.HiveCompatibilitySuite",
+ "org.apache.spark.sql.hive.client.VersionsSuite",
+ "org.apache.spark.sql.hive.client.HiveClientVersions",
+ "org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite",
+ "org.apache.spark.ml.classification.LogisticRegressionSuite",
+ "org.apache.spark.ml.classification.LinearSVCSuite",
+ "org.apache.spark.sql.SQLQueryTestSuite"
+ )
+
+ private val DEFAULT_TEST_GROUP = "default_test_group"
+
+ private def testNameToTestGroup(name: String): String = name match {
+ case _ if testsWhichShouldRunInTheirOwnDedicatedJvm.contains(name) => name
+ case _ => DEFAULT_TEST_GROUP
+ }
+
+ lazy val settings = Seq(
+ testGrouping in Test := {
+ val tests: Seq[TestDefinition] = (definedTests in Test).value
+ val defaultForkOptions = ForkOptions(
+ bootJars = Nil,
+ javaHome = javaHome.value,
+ connectInput = connectInput.value,
+ outputStrategy = outputStrategy.value,
+ runJVMOptions = (javaOptions in Test).value,
+ workingDirectory = Some(baseDirectory.value),
+ envVars = (envVars in Test).value
+ )
+ tests.groupBy(test => testNameToTestGroup(test.name)).map { case
(groupName, groupTests) =>
+ val forkOptions = {
+ if (groupName == DEFAULT_TEST_GROUP) {
+ defaultForkOptions
+ } else {
+ defaultForkOptions.copy(runJVMOptions =
defaultForkOptions.runJVMOptions ++
+
Seq(s"-Djava.io.tmpdir=${baseDirectory.value}/target/tmp/$groupName"))
+ }
+ }
+ new Tests.Group(
+ name = groupName,
+ tests = groupTests,
+ runPolicy = Tests.SubProcess(forkOptions))
+ }
+ }.toSeq
+ )
}
object Core {
@@ -844,8 +922,14 @@ object TestSettings {
testOptions in Test += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
// Enable Junit testing.
libraryDependencies += "com.novocode" % "junit-interface" % "0.11" %
"test",
- // Only allow one test at a time, even across projects, since they run in
the same JVM
- parallelExecution in Test := false,
+ // `parallelExecutionInTest` controls whether test suites belonging to the
same SBT project
+ // can run in parallel with one another. It does NOT control whether tests
execute in parallel
+ // within the same JVM (which is controlled by `testForkedParallel`) or
whether test cases
+ // within the same suite can run in parallel (which is a ScalaTest runner
option which is passed
+ // to the underlying runner but is not a SBT-level configuration). This
needs to be `true` in
+ // order for the extra parallelism enabled by `SparkParallelTestGrouping`
to take effect.
+ // The `SERIAL_SBT_TESTS` check is here so the extra parallelism can be
feature-flagged.
+ parallelExecution in Test := { if (sys.env.contains("SERIAL_SBT_TESTS"))
false else true },
// Make sure the test temp directory exists.
resourceGenerators in Test += Def.macroValueI(resourceManaged in Test map
{ outDir: File =>
var dir = new File(testTempDir)
@@ -866,7 +950,12 @@ object TestSettings {
}
Seq.empty[File]
}).value,
- concurrentRestrictions in Global += Tags.limit(Tags.Test, 1)
+ concurrentRestrictions in Global := {
+ // The number of concurrent test groups is empirically chosen based on
experience
+ // with Jenkins flakiness.
+ if (sys.env.contains("SERIAL_SBT_TESTS")) (concurrentRestrictions in
Global).value
+ else Seq(Tags.limit(Tags.ForkedTestGroup, 4))
+ }
)
}
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 3a79131..48ce178 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -169,7 +169,7 @@ abstract class BaseYarnClusterSuite
val handle = launcher.startApplication()
try {
- eventually(timeout(2 minutes), interval(1 second)) {
+ eventually(timeout(3.minutes), interval(1.second)) {
assert(handle.getState().isFinal())
}
} finally {
diff --git
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 32e3d05..2c34b4e 100644
---
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -202,7 +202,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
.startApplication()
try {
- eventually(timeout(30 seconds), interval(100 millis)) {
+ eventually(timeout(3.minutes), interval(100.milliseconds)) {
handle.getState() should be (SparkAppHandle.State.RUNNING)
}
@@ -210,7 +210,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
handle.getAppId() should startWith ("application_")
handle.stop()
- eventually(timeout(30 seconds), interval(100 millis)) {
+ eventually(timeout(3.minutes), interval(100.milliseconds)) {
handle.getState() should be (SparkAppHandle.State.KILLED)
}
} finally {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index ab817ff..f3a741f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -267,9 +267,12 @@ class SQLQueryTestSuite extends QueryTest with
SharedSQLContext {
val df = session.sql(sql)
val schema = df.schema
val notIncludedMsg = "[not included in comparison]"
+ val clsName = this.getClass.getCanonicalName
// Get answer, but also get rid of the #1234 expression ids that show up
in explain plans
val answer =
df.queryExecution.hiveResultString().map(_.replaceAll("#\\d+", "#x")
- .replaceAll("Location.*/sql/core/", s"Location
${notIncludedMsg}sql/core/")
+ .replaceAll(
+ s"Location.*/sql/core/spark-warehouse/$clsName/",
+ s"Location ${notIncludedMsg}sql/core/spark-warehouse/")
.replaceAll("Created By.*", s"Created By $notIncludedMsg")
.replaceAll("Created Time.*", s"Created Time $notIncludedMsg")
.replaceAll("Last Access.*", s"Last Access $notIncludedMsg")
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index c5f3fe5..0091f8b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -490,7 +490,8 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with
SharedSQLContext with
}
// Wait for listener to finish computing the metrics for the execution.
- while (statusStore.executionsList().last.metricValues == null) {
+ while (statusStore.executionsList().isEmpty ||
+ statusStore.executionsList().last.metricValues == null) {
Thread.sleep(100)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index fb0b365..9f6553e 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -195,7 +195,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
import testImplicits._
- override val streamingTimeout = 20.seconds
+ override val streamingTimeout = 80.seconds
/** Use `format` and `path` to create FileStreamSource via DataFrameReader */
private def createFileStreamSource(
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 35644c5..7bd1320 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -87,7 +87,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with
TimeLimits with Be
protected val defaultUseV2Sink = false
/** How long to wait for an active stream to catch up when checking a
result. */
- val streamingTimeout = 10.seconds
+ val streamingTimeout = 60.seconds
/** A trait for actions that can be performed while testing a streaming
DataFrame. */
trait StreamAction
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 46eec73..d17d035 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -36,21 +36,26 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.BlockingSource
import org.apache.spark.util.Utils
-class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter {
+class StreamingQueryManagerSuite extends StreamTest {
import AwaitTerminationTester._
import testImplicits._
override val streamingTimeout = 20.seconds
- before {
+ override def beforeEach(): Unit = {
+ super.beforeEach()
assert(spark.streams.active.isEmpty)
spark.streams.resetTerminated()
}
- after {
- assert(spark.streams.active.isEmpty)
- spark.streams.resetTerminated()
+ override def afterEach(): Unit = {
+ try {
+ assert(spark.streams.active.isEmpty)
+ spark.streams.resetTerminated()
+ } finally {
+ super.afterEach()
+ }
}
testQuietly("listing") {
@@ -84,7 +89,7 @@ class StreamingQueryManagerSuite extends StreamTest with
BeforeAndAfter {
}
}
- testQuietly("awaitAnyTermination without timeout and resetTerminated") {
+ testRetry("awaitAnyTermination without timeout and resetTerminated") {
val datasets = Seq.fill(5)(makeDataset._2)
withQueriesOn(datasets: _*) { queries =>
require(queries.size === datasets.size)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
index e7e0ce6..efdbc7e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SharedSparkSession.scala
@@ -25,7 +25,7 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.{SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
/**
* Helper trait for SQL test suites where all tests share a single
[[TestSparkSession]].
@@ -36,7 +36,7 @@ trait SharedSparkSession
with Eventually { self: Suite =>
protected def sparkConf = {
- new SparkConf()
+ val conf = new SparkConf()
.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
.set("spark.unsafe.exceptionOnMemoryLeak", "true")
.set(SQLConf.CODEGEN_FALLBACK.key, "false")
@@ -45,6 +45,9 @@ trait SharedSparkSession
// this rule may potentially block testing of other optimization rules
such as
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key,
ConvertToLocalRelation.ruleName)
+ conf.set(
+ StaticSQLConf.WAREHOUSE_PATH,
+ conf.get(StaticSQLConf.WAREHOUSE_PATH) + "/" + getClass.getCanonicalName)
}
/**
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
index fa9f753..5bdb13a 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType}
+import org.apache.spark.util.Utils
// TODO: Refactor this to `HivePartitionFilteringSuite`
class HiveClientSuite(version: String)
@@ -45,6 +46,7 @@ class HiveClientSuite(version: String)
val hadoopConf = new Configuration()
hadoopConf.setBoolean(tryDirectSqlKey, tryDirectSql)
+ hadoopConf.set("hive.metastore.warehouse.dir",
Utils.createTempDir().toURI().toString())
val client = buildClient(hadoopConf)
client.runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (ds INT, h
INT, chunk STRING)")
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
index fc6218a..33f93da 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala
@@ -121,7 +121,7 @@ class ReceiverSuite extends TestSuiteBase with TimeLimits
with Serializable {
}
// Verify that stopping actually stops the thread
- failAfter(100 millis) {
+ failAfter(1.second) {
receiver.stop("test")
assert(receiver.isStopped)
assert(!receiver.otherThread.isAlive)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]