Repository: spark
Updated Branches:
  refs/heads/master 055714661 -> 335f10eda


[SPARK-7997][CORE] Add rpcEnv.awaitTermination() back to SparkEnv

`rpcEnv.awaitTermination()` was not added in #10854 because some Streaming 
Python tests hung forever.

This patch fixed the hung issue and added rpcEnv.awaitTermination() back to 
SparkEnv.

Previously, Streaming Kafka Python tests shutdowns the zookeeper server before 
stopping StreamingContext. Then when stopping StreamingContext, KafkaReceiver 
may be hung due to https://issues.apache.org/jira/browse/KAFKA-601, hence, some 
thread of RpcEnv's Dispatcher cannot exit and rpcEnv.awaitTermination is 
hung.The patch just changed the shutdown order to fix it.

Author: Shixiong Zhu <[email protected]>

Closes #11031 from zsxwing/awaitTermination.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/335f10ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/335f10ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/335f10ed

Branch: refs/heads/master
Commit: 335f10edad8c759bad3dbd0660ed4dd5d70ddd8b
Parents: 0557146
Author: Shixiong Zhu <[email protected]>
Authored: Tue Feb 2 21:13:54 2016 -0800
Committer: Reynold Xin <[email protected]>
Committed: Tue Feb 2 21:13:54 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkEnv.scala | 1 +
 python/pyspark/streaming/tests.py                   | 4 ++--
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/335f10ed/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala 
b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 12c7b20..9461afd 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -91,6 +91,7 @@ class SparkEnv (
       metricsSystem.stop()
       outputCommitCoordinator.stop()
       rpcEnv.shutdown()
+      rpcEnv.awaitTermination()
 
       // Note that blockTransferService is stopped by BlockManager since it is 
started by it.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/335f10ed/python/pyspark/streaming/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/tests.py 
b/python/pyspark/streaming/tests.py
index 24b8126..b33e825 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1013,12 +1013,12 @@ class KafkaStreamTests(PySparkStreamingTestCase):
         self._kafkaTestUtils.setup()
 
     def tearDown(self):
+        super(KafkaStreamTests, self).tearDown()
+
         if self._kafkaTestUtils is not None:
             self._kafkaTestUtils.teardown()
             self._kafkaTestUtils = None
 
-        super(KafkaStreamTests, self).tearDown()
-
     def _randomTopic(self):
         return "topic-%d" % random.randint(0, 10000)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to