Repository: spark
Updated Branches:
  refs/heads/master 7b7c85ede -> 2aaed0a4d


[SPARK-21006][TESTS][FOLLOW-UP] Some Worker's RpcEnv is leaked in WorkerSuite

## What changes were proposed in this pull request?

Create rpcEnv and run later needs shutdown. as #18226

## How was this patch tested?
unit test

Author: liuxian <[email protected]>

Closes #18259 from 10110346/wip-lx-0610.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2aaed0a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2aaed0a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2aaed0a4

Branch: refs/heads/master
Commit: 2aaed0a4db84e99186b52a2c49d532702b575406
Parents: 7b7c85e
Author: liuxian <[email protected]>
Authored: Tue Jun 13 12:29:50 2017 +0100
Committer: Sean Owen <[email protected]>
Committed: Tue Jun 13 12:29:50 2017 +0100

----------------------------------------------------------------------
 .../spark/deploy/worker/WorkerSuite.scala       | 39 +++++++++++++-------
 1 file changed, 25 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2aaed0a4/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
index 101a44e..ce212a7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy.worker
 
-import org.scalatest.Matchers
+import org.scalatest.{BeforeAndAfter, Matchers}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.{Command, ExecutorState}
@@ -25,7 +25,7 @@ import 
org.apache.spark.deploy.DeployMessages.{DriverStateChanged, ExecutorState
 import org.apache.spark.deploy.master.DriverState
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
 
-class WorkerSuite extends SparkFunSuite with Matchers {
+class WorkerSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
 
   import org.apache.spark.deploy.DeployTestUtils._
 
@@ -34,6 +34,25 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   }
   def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = 
false).setAll(opts)
 
+  private var _worker: Worker = _
+
+  private def makeWorker(conf: SparkConf): Worker = {
+    assert(_worker === null, "Some Worker's RpcEnv is leaked in tests")
+    val securityMgr = new SecurityManager(conf)
+    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, securityMgr)
+    _worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, 
Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
+      "Worker", "/tmp", conf, securityMgr)
+    _worker
+  }
+
+  after {
+    if (_worker != null) {
+      _worker.rpcEnv.shutdown()
+      _worker.rpcEnv.awaitTermination()
+      _worker = null
+    }
+  }
+
   test("test isUseLocalNodeSSLConfig") {
     Worker.isUseLocalNodeSSLConfig(cmd("-Dasdf=dfgh")) shouldBe false
     Worker.isUseLocalNodeSSLConfig(cmd("-Dspark.ssl.useNodeLocalConf=true")) 
shouldBe true
@@ -65,9 +84,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   test("test clearing of finishedExecutors (small number of executors)") {
     val conf = new SparkConf()
     conf.set("spark.worker.ui.retainedExecutors", 2.toString)
-    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new 
SecurityManager(conf))
-    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, 
Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, new SecurityManager(conf))
+    val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 5) {
       worker.executors += s"app1/$i" -> createExecutorRunner(i)
@@ -91,9 +108,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   test("test clearing of finishedExecutors (more executors)") {
     val conf = new SparkConf()
     conf.set("spark.worker.ui.retainedExecutors", 30.toString)
-    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new 
SecurityManager(conf))
-    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, 
Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, new SecurityManager(conf))
+    val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 50) {
       worker.executors += s"app1/$i" -> createExecutorRunner(i)
@@ -126,9 +141,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   test("test clearing of finishedDrivers (small number of drivers)") {
     val conf = new SparkConf()
     conf.set("spark.worker.ui.retainedDrivers", 2.toString)
-    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new 
SecurityManager(conf))
-    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, 
Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, new SecurityManager(conf))
+    val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 5) {
       val driverId = s"driverId-$i"
@@ -152,9 +165,7 @@ class WorkerSuite extends SparkFunSuite with Matchers {
   test("test clearing of finishedDrivers (more drivers)") {
     val conf = new SparkConf()
     conf.set("spark.worker.ui.retainedDrivers", 30.toString)
-    val rpcEnv = RpcEnv.create("test", "localhost", 12345, conf, new 
SecurityManager(conf))
-    val worker = new Worker(rpcEnv, 50000, 20, 1234 * 5, 
Array.fill(1)(RpcAddress("1.2.3.4", 1234)),
-      "Worker", "/tmp", conf, new SecurityManager(conf))
+    val worker = makeWorker(conf)
     // initialize workers
     for (i <- 0 until 50) {
       val driverId = s"driverId-$i"


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to