This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c08021c  [SPARK-26776][PYTHON] Reduce Py4J communication cost in 
PySpark's execution barrier check
c08021c is described below

commit c08021cd8734b3cc183e7f65312d14cdaa8541b7
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Wed Jan 30 12:24:27 2019 +0800

    [SPARK-26776][PYTHON] Reduce Py4J communication cost in PySpark's execution 
barrier check
    
    ## What changes were proposed in this pull request?
    
    I am investigating flaky tests. I realised that:
    
    ```
          File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
2512, in __init__
            self.is_barrier = prev._is_barrier() or isFromBarrier
          File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 
2412, in _is_barrier
            return self._jrdd.rdd().isBarrier()
          File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1286, in __call__
            answer, self.gateway_client, self.target_id, self.name)
          File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py",
 line 342, in get_return_value
            return OUTPUT_CONVERTER[type](answer[2:], gateway_client)
          File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 2492, in <lambda>
            lambda target_id, gateway_client: JavaObject(target_id, 
gateway_client))
          File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
 line 1324, in __init__
            ThreadSafeFinalizer.add_finalizer(key, value)
          File 
"/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py",
 line 43, in add_finalizer
            cls.finalizers[id] = weak_ref
          File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, 
in __exit__
            self.release()
          File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, 
in release
            self.__block.release()
        error: release unlocked lock
    ```
    
    I assume it might not be directly related with the test itself but I 
noticed that it `prev._is_barrier()` attempts to access via Py4J.
    
    Accessing via Py4J is expensive. Therefore, this PR proposes to avoid Py4J 
access when `isFromBarrier` is `True`.
    
    ## How was this patch tested?
    
    Unittests should cover this.
    
    Closes #23690 from HyukjinKwon/minor-barrier.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 python/pyspark/rdd.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 7396930..751df44 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2509,7 +2509,7 @@ class PipelinedRDD(RDD):
         self._jrdd_deserializer = self.ctx.serializer
         self._bypass_serializer = False
         self.partitioner = prev.partitioner if self.preservesPartitioning else 
None
-        self.is_barrier = prev._is_barrier() or isFromBarrier
+        self.is_barrier = isFromBarrier or prev._is_barrier()
 
     def getNumPartitions(self):
         return self._prev_jrdd.partitions().size()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to