Josh Rosen created SPARK-3105:
---------------------------------
Summary: Calling cache() after RDDs are pipelined has no effect in
PySpark
Key: SPARK-3105
URL: https://issues.apache.org/jira/browse/SPARK-3105
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 1.0.0, 1.1.0
Reporter: Josh Rosen
PySpark's PipelinedRDD decides whether to pipeline transformations by checking
whether those transformations are pipelinable _at the time that the
PipelinedRDD objects are created_ rather than at the time that we invoke
actions. This might lead to problems if we call {{cache()}} on an RDD after
it's already been used in a pipeline:
{code}
rdd1 = sc.parallelize(range(100)).map(lambda x: x)
rdd2 = rdd1.map(lambda x: 2 * x)
rdd1.cache()
rdd2.collect()
{code}
When I run this code, I'd expect {cache()}} to break the pipeline and cache
intermediate results, but instead the two transformations are pipelined
together in Python, effectively ignoring the {{cache()}}.
Note that {{cache()}} works properly if we call it before performing any other
transformations on the RDD:
{code}
rdd1 = sc.parallelize(range(100)).map(lambda x: x).cache()
rdd2 = rdd1.map(lambda x: 2 * x)
rdd2.collect()
{code}
This works as expected and caches {{rdd1}}.
To fix this, I think we dynamically decide whether to pipeline when we actually
perform actions, rather than statically deciding when we create the RDDs.
We should also add tests for this.
(Thanks to [~tdas] for pointing out this issue.)
--
This message was sent by Atlassian JIRA
(v6.2#6252)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]