This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new da272f7 [SPARK-33793][TESTS][3.0] Introduce withExecutor to ensure
proper cleanup in tests
da272f7 is described below
commit da272f7ffb370b10d151da124586c52ca343e056
Author: Sander Goos <[email protected]>
AuthorDate: Wed Dec 16 23:33:09 2020 +0900
[SPARK-33793][TESTS][3.0] Introduce withExecutor to ensure proper cleanup
in tests
Backport of: https://github.com/apache/spark/pull/30783
### What changes were proposed in this pull request?
This PR introduces a helper method `withExecutor` that handles the creation
of an Executor object and ensures that it is always stopped in a finally block.
The tests in ExecutorSuite have been refactored to use this method.
### Why are the changes needed?
Recently an issue was discovered that leaked Executors (which are not
explicitly stopped after a test) can cause other tests to fail due to the JVM
being killed after 10 min. It is therefore crucial that tests always stop the
Executor. By introducing this helper method, a simple pattern is established
that can be easily adopted in new tests, which reduces the risk of regressions.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Run the ExecutorSuite locally.
Closes #30801 from sander-goos/SPARK-33793-close-executors-3.0.
Authored-by: Sander Goos <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
---
.../org/apache/spark/executor/ExecutorSuite.scala | 99 ++++++++++++----------
1 file changed, 54 insertions(+), 45 deletions(-)
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 8e58bef..6b3df6d 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.executor
import java.io.{Externalizable, File, ObjectInput, ObjectOutput}
import java.lang.Thread.UncaughtExceptionHandler
+import java.net.URL
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
@@ -53,7 +54,7 @@ import org.apache.spark.scheduler.{DirectTaskResult,
FakeTask, ResultTask, Task,
import org.apache.spark.serializer.{JavaSerializer, SerializerInstance,
SerializerManager}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{BlockManager, BlockManagerId}
-import org.apache.spark.util.{LongAccumulator, UninterruptibleThread, Utils}
+import org.apache.spark.util.{LongAccumulator, SparkUncaughtExceptionHandler,
UninterruptibleThread, Utils}
class ExecutorSuite extends SparkFunSuite
with LocalSparkContext with MockitoSugar with Eventually with
PrivateMethodTester {
@@ -64,6 +65,33 @@ class ExecutorSuite extends SparkFunSuite
super.afterEach()
}
+ /**
+ * Creates an Executor with the provided arguments, is then passed to `f`
+ * and will be stopped after `f` returns.
+ */
+ def withExecutor(
+ executorId: String,
+ executorHostname: String,
+ env: SparkEnv,
+ userClassPath: Seq[URL] = Nil,
+ isLocal: Boolean = true,
+ uncaughtExceptionHandler: UncaughtExceptionHandler
+ = new SparkUncaughtExceptionHandler,
+ resources: immutable.Map[String, ResourceInformation]
+ = immutable.Map.empty[String, ResourceInformation])(f: Executor =>
Unit): Unit = {
+ var executor: Executor = null
+ try {
+ executor = new Executor(executorId, executorHostname, env,
userClassPath, isLocal,
+ uncaughtExceptionHandler, resources)
+
+ f(executor)
+ } finally {
+ if (executor != null) {
+ executor.stop()
+ }
+ }
+ }
+
test("SPARK-15963: Catch `TaskKilledException` correctly in
Executor.TaskRunner") {
// mock some objects to make Executor.launchTask() happy
val conf = new SparkConf
@@ -116,10 +144,8 @@ class ExecutorSuite extends SparkFunSuite
}
})
- var executor: Executor = null
- try {
- executor = new Executor("id", "localhost", env, userClassPath = Nil,
isLocal = true,
- resources = immutable.Map.empty[String, ResourceInformation])
+ withExecutor("id", "localhost", env) { executor =>
+
// the task will be launched in a dedicated worker thread
executor.launchTask(mockExecutorBackend, taskDescription)
@@ -139,11 +165,6 @@ class ExecutorSuite extends SparkFunSuite
assert(executorSuiteHelper.testFailedReason.toErrorString ===
"TaskKilled (test)")
assert(executorSuiteHelper.taskState === TaskState.KILLED)
}
- finally {
- if (executor != null) {
- executor.stop()
- }
- }
}
test("SPARK-19276: Handle FetchFailedExceptions that are hidden by user
exceptions") {
@@ -255,25 +276,24 @@ class ExecutorSuite extends SparkFunSuite
confs.foreach { case (k, v) => conf.set(k, v) }
val serializer = new JavaSerializer(conf)
val env = createMockEnv(conf, serializer)
- val executor =
- new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil,
isLocal = true,
- resources = immutable.Map.empty[String, ResourceInformation])
- val executorClass = classOf[Executor]
-
- // Save all heartbeats sent into an ArrayBuffer for verification
- val heartbeats = ArrayBuffer[Heartbeat]()
- val mockReceiver = mock[RpcEndpointRef]
- when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
- .thenAnswer((invocation: InvocationOnMock) => {
- val args = invocation.getArguments()
- heartbeats += args(0).asInstanceOf[Heartbeat]
- HeartbeatResponse(false)
- })
- val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
- receiverRef.setAccessible(true)
- receiverRef.set(executor, mockReceiver)
+ withExecutor("id", "localhost", SparkEnv.get) { executor =>
+ val executorClass = classOf[Executor]
- f(executor, heartbeats)
+ // Save all heartbeats sent into an ArrayBuffer for verification
+ val heartbeats = ArrayBuffer[Heartbeat]()
+ val mockReceiver = mock[RpcEndpointRef]
+ when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
+ .thenAnswer((invocation: InvocationOnMock) => {
+ val args = invocation.getArguments()
+ heartbeats += args(0).asInstanceOf[Heartbeat]
+ HeartbeatResponse(false)
+ })
+ val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
+ receiverRef.setAccessible(true)
+ receiverRef.set(executor, mockReceiver)
+
+ f(executor, heartbeats)
+ }
}
private def heartbeatZeroAccumulatorUpdateTest(dropZeroMetrics: Boolean):
Unit = {
@@ -354,10 +374,7 @@ class ExecutorSuite extends SparkFunSuite
val taskDescription = createResultTaskDescription(serializer, taskBinary,
rdd, 0)
val mockBackend = mock[ExecutorBackend]
- var executor: Executor = null
- try {
- executor = new Executor("id", "localhost", SparkEnv.get, userClassPath =
Nil, isLocal = true,
- resources = immutable.Map.empty[String, ResourceInformation])
+ withExecutor("id", "localhost", SparkEnv.get) { executor =>
executor.launchTask(mockBackend, taskDescription)
// Ensure that the executor's metricsPoller is polled so that values are
recorded for
@@ -368,10 +385,6 @@ class ExecutorSuite extends SparkFunSuite
eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert(executor.numRunningTasks === 0)
}
- } finally {
- if (executor != null) {
- executor.stop()
- }
}
// Verify that peak values for task metrics get sent in the TaskResult
@@ -466,12 +479,11 @@ class ExecutorSuite extends SparkFunSuite
poll: Boolean = false): (TaskFailedReason, UncaughtExceptionHandler) = {
val mockBackend = mock[ExecutorBackend]
val mockUncaughtExceptionHandler = mock[UncaughtExceptionHandler]
- var executor: Executor = null
val timedOut = new AtomicBoolean(false)
- try {
- executor = new Executor("id", "localhost", SparkEnv.get, userClassPath =
Nil, isLocal = true,
- uncaughtExceptionHandler = mockUncaughtExceptionHandler,
- resources = immutable.Map.empty[String, ResourceInformation])
+
+ withExecutor("id", "localhost", SparkEnv.get,
+ uncaughtExceptionHandler = mockUncaughtExceptionHandler) { executor =>
+
// the task will be launched in a dedicated worker thread
executor.launchTask(mockBackend, taskDescription)
if (killTask) {
@@ -504,11 +516,8 @@ class ExecutorSuite extends SparkFunSuite
assert(executor.numRunningTasks === 0)
}
assert(!timedOut.get(), "timed out waiting to be ready to kill tasks")
- } finally {
- if (executor != null) {
- executor.stop()
- }
}
+
val orderedMock = inOrder(mockBackend)
val statusCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
orderedMock.verify(mockBackend)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]