This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 98e831d  [SPARK-25921][FOLLOW UP][PYSPARK] Fix barrier task run 
without BarrierTaskContext while python worker reuse
98e831d is described below

commit 98e831d32115aeec4ded235bf42978f407e376da
Author: Yuanjian Li <xyliyuanj...@gmail.com>
AuthorDate: Fri Jan 11 14:28:37 2019 +0800

    [SPARK-25921][FOLLOW UP][PYSPARK] Fix barrier task run without 
BarrierTaskContext while python worker reuse
    
    ## What changes were proposed in this pull request?
    
    It's the follow-up PR for #22962, contains the following works:
    - Remove `__init__` in TaskContext and BarrierTaskContext.
    - Add more comments to explain the fix.
    - Rewrite UT in a new class.
    
    ## How was this patch tested?
    
    New UT in test_taskcontext.py
    
    Closes #23435 from xuanyuanking/SPARK-25921-follow.
    
    Authored-by: Yuanjian Li <xyliyuanj...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/taskcontext.py            | 14 +++-----
 python/pyspark/tests/test_taskcontext.py | 57 +++++++++++++++++++++++---------
 2 files changed, 46 insertions(+), 25 deletions(-)

diff --git a/python/pyspark/taskcontext.py b/python/pyspark/taskcontext.py
index 98b505c..de4b6af 100644
--- a/python/pyspark/taskcontext.py
+++ b/python/pyspark/taskcontext.py
@@ -48,10 +48,6 @@ class TaskContext(object):
         cls._taskContext = taskContext = object.__new__(cls)
         return taskContext
 
-    def __init__(self):
-        """Construct a TaskContext, use get instead"""
-        pass
-
     @classmethod
     def _getOrCreate(cls):
         """Internal function to get or create global TaskContext."""
@@ -140,13 +136,13 @@ class BarrierTaskContext(TaskContext):
     _port = None
     _secret = None
 
-    def __init__(self):
-        """Construct a BarrierTaskContext, use get instead"""
-        pass
-
     @classmethod
     def _getOrCreate(cls):
-        """Internal function to get or create global BarrierTaskContext."""
+        """
+        Internal function to get or create global BarrierTaskContext. We need 
to make sure
+        BarrierTaskContext is returned from here because it is needed in 
python worker reuse
+        scenario, see SPARK-25921 for more details.
+        """
         if not isinstance(cls._taskContext, BarrierTaskContext):
             cls._taskContext = object.__new__(cls)
         return cls._taskContext
diff --git a/python/pyspark/tests/test_taskcontext.py 
b/python/pyspark/tests/test_taskcontext.py
index b3a9674..fdb5c40 100644
--- a/python/pyspark/tests/test_taskcontext.py
+++ b/python/pyspark/tests/test_taskcontext.py
@@ -14,11 +14,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+import os
 import random
 import sys
 import time
+import unittest
 
-from pyspark import SparkContext, TaskContext, BarrierTaskContext
+from pyspark import SparkConf, SparkContext, TaskContext, BarrierTaskContext
 from pyspark.testing.utils import PySparkTestCase
 
 
@@ -118,21 +120,6 @@ class TaskContextTests(PySparkTestCase):
         times = rdd.barrier().mapPartitions(f).map(context_barrier).collect()
         self.assertTrue(max(times) - min(times) < 1)
 
-    def test_barrier_with_python_worker_reuse(self):
-        """
-        Verify that BarrierTaskContext.barrier() with reused python worker.
-        """
-        self.sc._conf.set("spark.python.work.reuse", "true")
-        rdd = self.sc.parallelize(range(4), 4)
-        # start a normal job first to start all worker
-        result = rdd.map(lambda x: x ** 2).collect()
-        self.assertEqual([0, 1, 4, 9], result)
-        # make sure `spark.python.work.reuse=true`
-        self.assertEqual(self.sc._conf.get("spark.python.work.reuse"), "true")
-
-        # worker will be reused in this barrier job
-        self.test_barrier()
-
     def test_barrier_infos(self):
         """
         Verify that BarrierTaskContext.getTaskInfos() returns a list of all 
task infos in the
@@ -149,6 +136,44 @@ class TaskContextTests(PySparkTestCase):
         self.assertTrue(len(taskInfos[0]) == 4)
 
 
+class TaskContextTestsWithWorkerReuse(unittest.TestCase):
+
+    def setUp(self):
+        class_name = self.__class__.__name__
+        conf = SparkConf().set("spark.python.worker.reuse", "true")
+        self.sc = SparkContext('local[2]', class_name, conf=conf)
+
+    def test_barrier_with_python_worker_reuse(self):
+        """
+        Regression test for SPARK-25921: verify that 
BarrierTaskContext.barrier() with
+        reused python worker.
+        """
+        # start a normal job first to start all workers and get all worker pids
+        worker_pids = self.sc.parallelize(range(2), 2).map(lambda x: 
os.getpid()).collect()
+        # the worker will reuse in this barrier job
+        rdd = self.sc.parallelize(range(10), 2)
+
+        def f(iterator):
+            yield sum(iterator)
+
+        def context_barrier(x):
+            tc = BarrierTaskContext.get()
+            time.sleep(random.randint(1, 10))
+            tc.barrier()
+            return (time.time(), os.getpid())
+
+        result = rdd.barrier().mapPartitions(f).map(context_barrier).collect()
+        times = list(map(lambda x: x[0], result))
+        pids = list(map(lambda x: x[1], result))
+        # check both barrier and worker reuse effect
+        self.assertTrue(max(times) - min(times) < 1)
+        for pid in pids:
+            self.assertTrue(pid in worker_pids)
+
+    def tearDown(self):
+        self.sc.stop()
+
+
 if __name__ == "__main__":
     import unittest
     from pyspark.tests.test_taskcontext import *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to