Repository: spark Updated Branches: refs/heads/master 055714661 -> 335f10eda
[SPARK-7997][CORE] Add rpcEnv.awaitTermination() back to SparkEnv `rpcEnv.awaitTermination()` was not added in #10854 because some Streaming Python tests hung forever. This patch fixed the hung issue and added rpcEnv.awaitTermination() back to SparkEnv. Previously, Streaming Kafka Python tests shutdowns the zookeeper server before stopping StreamingContext. Then when stopping StreamingContext, KafkaReceiver may be hung due to https://issues.apache.org/jira/browse/KAFKA-601, hence, some thread of RpcEnv's Dispatcher cannot exit and rpcEnv.awaitTermination is hung.The patch just changed the shutdown order to fix it. Author: Shixiong Zhu <[email protected]> Closes #11031 from zsxwing/awaitTermination. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/335f10ed Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/335f10ed Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/335f10ed Branch: refs/heads/master Commit: 335f10edad8c759bad3dbd0660ed4dd5d70ddd8b Parents: 0557146 Author: Shixiong Zhu <[email protected]> Authored: Tue Feb 2 21:13:54 2016 -0800 Committer: Reynold Xin <[email protected]> Committed: Tue Feb 2 21:13:54 2016 -0800 ---------------------------------------------------------------------- core/src/main/scala/org/apache/spark/SparkEnv.scala | 1 + python/pyspark/streaming/tests.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/335f10ed/core/src/main/scala/org/apache/spark/SparkEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 12c7b20..9461afd 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -91,6 +91,7 @@ class SparkEnv ( metricsSystem.stop() outputCommitCoordinator.stop() rpcEnv.shutdown() + rpcEnv.awaitTermination() // Note that blockTransferService is stopped by BlockManager since it is started by it. http://git-wip-us.apache.org/repos/asf/spark/blob/335f10ed/python/pyspark/streaming/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py index 24b8126..b33e825 100644 --- a/python/pyspark/streaming/tests.py +++ b/python/pyspark/streaming/tests.py @@ -1013,12 +1013,12 @@ class KafkaStreamTests(PySparkStreamingTestCase): self._kafkaTestUtils.setup() def tearDown(self): + super(KafkaStreamTests, self).tearDown() + if self._kafkaTestUtils is not None: self._kafkaTestUtils.teardown() self._kafkaTestUtils = None - super(KafkaStreamTests, self).tearDown() - def _randomTopic(self): return "topic-%d" % random.randint(0, 10000) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
