Josh Rosen created SPARK-3105:
---------------------------------

             Summary: Calling cache() after RDDs are pipelined has no effect in 
PySpark
                 Key: SPARK-3105
                 URL: https://issues.apache.org/jira/browse/SPARK-3105
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 1.0.0, 1.1.0
            Reporter: Josh Rosen


PySpark's PipelinedRDD decides whether to pipeline transformations by checking 
whether those transformations are pipelinable _at the time that the 
PipelinedRDD objects are created_ rather than at the time that we invoke 
actions.  This might lead to problems if we call {{cache()}} on an RDD after 
it's already been used in a pipeline:

{code}
rdd1 = sc.parallelize(range(100)).map(lambda x: x)
rdd2 = rdd1.map(lambda x: 2 * x)
rdd1.cache()
rdd2.collect()
{code}

When I run this code, I'd expect {cache()}} to break the pipeline and cache 
intermediate results, but instead the two transformations are pipelined 
together in Python, effectively ignoring the {{cache()}}.

Note that {{cache()}} works properly if we call it before performing any other 
transformations on the RDD:

{code}
rdd1 = sc.parallelize(range(100)).map(lambda x: x).cache()
rdd2 = rdd1.map(lambda x: 2 * x)
rdd2.collect()
{code}

This works as expected and caches {{rdd1}}.

To fix this, I think we dynamically decide whether to pipeline when we actually 
perform actions, rather than statically deciding when we create the RDDs.

We should also add tests for this.

(Thanks to [~tdas] for pointing out this issue.)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to