This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c08021c [SPARK-26776][PYTHON] Reduce Py4J communication cost in PySpark's execution barrier check c08021c is described below commit c08021cd8734b3cc183e7f65312d14cdaa8541b7 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Wed Jan 30 12:24:27 2019 +0800 [SPARK-26776][PYTHON] Reduce Py4J communication cost in PySpark's execution barrier check ## What changes were proposed in this pull request? I am investigating flaky tests. I realised that: ``` File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 2512, in __init__ self.is_barrier = prev._is_barrier() or isFromBarrier File "/home/jenkins/workspace/SparkPullRequestBuilder/python/pyspark/rdd.py", line 2412, in _is_barrier return self._jrdd.rdd().isBarrier() File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__ answer, self.gateway_client, self.target_id, self.name) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/protocol.py", line 342, in get_return_value return OUTPUT_CONVERTER[type](answer[2:], gateway_client) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 2492, in <lambda> lambda target_id, gateway_client: JavaObject(target_id, gateway_client)) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1324, in __init__ ThreadSafeFinalizer.add_finalizer(key, value) File "/home/jenkins/workspace/SparkPullRequestBuilder/python/lib/py4j-0.10.8.1-src.zip/py4j/finalizer.py", line 43, in add_finalizer cls.finalizers[id] = weak_ref File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 216, in __exit__ self.release() File "/usr/lib64/pypy-2.5.1/lib-python/2.7/threading.py", line 208, in release self.__block.release() error: release unlocked lock ``` I assume it might not be directly related with the test itself but I noticed that it `prev._is_barrier()` attempts to access via Py4J. Accessing via Py4J is expensive. Therefore, this PR proposes to avoid Py4J access when `isFromBarrier` is `True`. ## How was this patch tested? Unittests should cover this. Closes #23690 from HyukjinKwon/minor-barrier. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- python/pyspark/rdd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 7396930..751df44 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2509,7 +2509,7 @@ class PipelinedRDD(RDD): self._jrdd_deserializer = self.ctx.serializer self._bypass_serializer = False self.partitioner = prev.partitioner if self.preservesPartitioning else None - self.is_barrier = prev._is_barrier() or isFromBarrier + self.is_barrier = isFromBarrier or prev._is_barrier() def getNumPartitions(self): return self._prev_jrdd.partitions().size() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org