This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new da272f7  [SPARK-33793][TESTS][3.0] Introduce withExecutor to ensure 
proper cleanup in tests
da272f7 is described below

commit da272f7ffb370b10d151da124586c52ca343e056
Author: Sander Goos <[email protected]>
AuthorDate: Wed Dec 16 23:33:09 2020 +0900

    [SPARK-33793][TESTS][3.0] Introduce withExecutor to ensure proper cleanup 
in tests
    
    Backport of: https://github.com/apache/spark/pull/30783
    
    ### What changes were proposed in this pull request?
    This PR introduces a helper method `withExecutor` that handles the creation 
of an Executor object and ensures that it is always stopped in a finally block. 
The tests in ExecutorSuite have been refactored to use this method.
    
    ### Why are the changes needed?
    Recently an issue was discovered that leaked Executors (which are not 
explicitly stopped after a test) can cause other tests to fail due to the JVM 
being killed after 10 min. It is therefore crucial that tests always stop the 
Executor. By introducing this helper method, a simple pattern is established 
that can be easily adopted in new tests, which reduces the risk of regressions.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Run the ExecutorSuite locally.
    
    Closes #30801 from sander-goos/SPARK-33793-close-executors-3.0.
    
    Authored-by: Sander Goos <[email protected]>
    Signed-off-by: HyukjinKwon <[email protected]>
---
 .../org/apache/spark/executor/ExecutorSuite.scala  | 99 ++++++++++++----------
 1 file changed, 54 insertions(+), 45 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 8e58bef..6b3df6d 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.executor
 
 import java.io.{Externalizable, File, ObjectInput, ObjectOutput}
 import java.lang.Thread.UncaughtExceptionHandler
+import java.net.URL
 import java.nio.ByteBuffer
 import java.util.Properties
 import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, TimeUnit}
@@ -53,7 +54,7 @@ import org.apache.spark.scheduler.{DirectTaskResult, 
FakeTask, ResultTask, Task,
 import org.apache.spark.serializer.{JavaSerializer, SerializerInstance, 
SerializerManager}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{BlockManager, BlockManagerId}
-import org.apache.spark.util.{LongAccumulator, UninterruptibleThread, Utils}
+import org.apache.spark.util.{LongAccumulator, SparkUncaughtExceptionHandler, 
UninterruptibleThread, Utils}
 
 class ExecutorSuite extends SparkFunSuite
     with LocalSparkContext with MockitoSugar with Eventually with 
PrivateMethodTester {
@@ -64,6 +65,33 @@ class ExecutorSuite extends SparkFunSuite
     super.afterEach()
   }
 
+  /**
+   * Creates an Executor with the provided arguments, is then passed to `f`
+   * and will be stopped after `f` returns.
+   */
+  def withExecutor(
+      executorId: String,
+      executorHostname: String,
+      env: SparkEnv,
+      userClassPath: Seq[URL] = Nil,
+      isLocal: Boolean = true,
+      uncaughtExceptionHandler: UncaughtExceptionHandler
+        = new SparkUncaughtExceptionHandler,
+      resources: immutable.Map[String, ResourceInformation]
+        = immutable.Map.empty[String, ResourceInformation])(f: Executor => 
Unit): Unit = {
+    var executor: Executor = null
+    try {
+      executor = new Executor(executorId, executorHostname, env, 
userClassPath, isLocal,
+        uncaughtExceptionHandler, resources)
+
+      f(executor)
+    } finally {
+      if (executor != null) {
+        executor.stop()
+      }
+    }
+  }
+
   test("SPARK-15963: Catch `TaskKilledException` correctly in 
Executor.TaskRunner") {
     // mock some objects to make Executor.launchTask() happy
     val conf = new SparkConf
@@ -116,10 +144,8 @@ class ExecutorSuite extends SparkFunSuite
         }
       })
 
-    var executor: Executor = null
-    try {
-      executor = new Executor("id", "localhost", env, userClassPath = Nil, 
isLocal = true,
-        resources = immutable.Map.empty[String, ResourceInformation])
+    withExecutor("id", "localhost", env) { executor =>
+
       // the task will be launched in a dedicated worker thread
       executor.launchTask(mockExecutorBackend, taskDescription)
 
@@ -139,11 +165,6 @@ class ExecutorSuite extends SparkFunSuite
       assert(executorSuiteHelper.testFailedReason.toErrorString === 
"TaskKilled (test)")
       assert(executorSuiteHelper.taskState === TaskState.KILLED)
     }
-    finally {
-      if (executor != null) {
-        executor.stop()
-      }
-    }
   }
 
   test("SPARK-19276: Handle FetchFailedExceptions that are hidden by user 
exceptions") {
@@ -255,25 +276,24 @@ class ExecutorSuite extends SparkFunSuite
     confs.foreach { case (k, v) => conf.set(k, v) }
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
-    val executor =
-      new Executor("id", "localhost", SparkEnv.get, userClassPath = Nil, 
isLocal = true,
-        resources = immutable.Map.empty[String, ResourceInformation])
-    val executorClass = classOf[Executor]
-
-    // Save all heartbeats sent into an ArrayBuffer for verification
-    val heartbeats = ArrayBuffer[Heartbeat]()
-    val mockReceiver = mock[RpcEndpointRef]
-    when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
-      .thenAnswer((invocation: InvocationOnMock) => {
-        val args = invocation.getArguments()
-        heartbeats += args(0).asInstanceOf[Heartbeat]
-        HeartbeatResponse(false)
-      })
-    val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
-    receiverRef.setAccessible(true)
-    receiverRef.set(executor, mockReceiver)
+    withExecutor("id", "localhost", SparkEnv.get) { executor =>
+      val executorClass = classOf[Executor]
 
-    f(executor, heartbeats)
+      // Save all heartbeats sent into an ArrayBuffer for verification
+      val heartbeats = ArrayBuffer[Heartbeat]()
+      val mockReceiver = mock[RpcEndpointRef]
+      when(mockReceiver.askSync(any[Heartbeat], any[RpcTimeout])(any))
+        .thenAnswer((invocation: InvocationOnMock) => {
+          val args = invocation.getArguments()
+          heartbeats += args(0).asInstanceOf[Heartbeat]
+          HeartbeatResponse(false)
+        })
+      val receiverRef = executorClass.getDeclaredField("heartbeatReceiverRef")
+      receiverRef.setAccessible(true)
+      receiverRef.set(executor, mockReceiver)
+
+      f(executor, heartbeats)
+    }
   }
 
   private def heartbeatZeroAccumulatorUpdateTest(dropZeroMetrics: Boolean): 
Unit = {
@@ -354,10 +374,7 @@ class ExecutorSuite extends SparkFunSuite
     val taskDescription = createResultTaskDescription(serializer, taskBinary, 
rdd, 0)
 
     val mockBackend = mock[ExecutorBackend]
-    var executor: Executor = null
-    try {
-      executor = new Executor("id", "localhost", SparkEnv.get, userClassPath = 
Nil, isLocal = true,
-        resources = immutable.Map.empty[String, ResourceInformation])
+    withExecutor("id", "localhost", SparkEnv.get) { executor =>
       executor.launchTask(mockBackend, taskDescription)
 
       // Ensure that the executor's metricsPoller is polled so that values are 
recorded for
@@ -368,10 +385,6 @@ class ExecutorSuite extends SparkFunSuite
       eventually(timeout(5.seconds), interval(10.milliseconds)) {
         assert(executor.numRunningTasks === 0)
       }
-    } finally {
-      if (executor != null) {
-        executor.stop()
-      }
     }
 
     // Verify that peak values for task metrics get sent in the TaskResult
@@ -466,12 +479,11 @@ class ExecutorSuite extends SparkFunSuite
       poll: Boolean = false): (TaskFailedReason, UncaughtExceptionHandler) = {
     val mockBackend = mock[ExecutorBackend]
     val mockUncaughtExceptionHandler = mock[UncaughtExceptionHandler]
-    var executor: Executor = null
     val timedOut = new AtomicBoolean(false)
-    try {
-      executor = new Executor("id", "localhost", SparkEnv.get, userClassPath = 
Nil, isLocal = true,
-        uncaughtExceptionHandler = mockUncaughtExceptionHandler,
-        resources = immutable.Map.empty[String, ResourceInformation])
+
+    withExecutor("id", "localhost", SparkEnv.get,
+        uncaughtExceptionHandler = mockUncaughtExceptionHandler) { executor =>
+
       // the task will be launched in a dedicated worker thread
       executor.launchTask(mockBackend, taskDescription)
       if (killTask) {
@@ -504,11 +516,8 @@ class ExecutorSuite extends SparkFunSuite
         assert(executor.numRunningTasks === 0)
       }
       assert(!timedOut.get(), "timed out waiting to be ready to kill tasks")
-    } finally {
-      if (executor != null) {
-        executor.stop()
-      }
     }
+
     val orderedMock = inOrder(mockBackend)
     val statusCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
     orderedMock.verify(mockBackend)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to