[ 
https://issues.apache.org/jira/browse/SPARK-2818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lu Lu updated SPARK-2818:
-------------------------

    Description: 
if the joinning RDDs are originating from a same cached RDD, the DAGScheduler 
will submit redundant stages to compute and cache the common parent.
For example:

{code}
    val edges = sc.textFile(...).cache()
    val bigSrc = edges.groupByKey().filter(...)
    val reversed = edges.map(edge => (edge._2, edge._1))
    val bigDst = reversed.groupByKey().filter(...)
    bigSrc.join(bigDst).count
{code}

The final count action will trigger two stages both to compute the edges RDD. 
It will result to two performance problerm:
(1) if the resources are sufficient, these two stages will be running 
concurrently and read the same HDFS file at the same time.
(2) if the two stages run one by one, the tasks of the latter stage can read 
the cached blocks of the edges RDD directly. But it cannot achieve 
data-locality for the latter stage because that the block location information 
are not known when submiting the stages.

  was:
if the joinning RDDs are originating from a same cached RDD a, the DAGScheduler 
will submit redundant stages to compute and cache the RDD a.
For example:

{code}
    val edges = sc.textFile(...).cache()
    val bigSrc = edges.groupByKey().filter(...)
    val reversed = edges.map(edge => (edge._2, edge._1))
    val bigDst = reversed.groupByKey().filter(...)
    bigSrc.join(bigDst).count
{code}

The final count action will trigger two stages both to compute the edges RDD. 
It will result to two performance problerm:
(1) if the resources are sufficient, these two stages will be running 
concurrently and read the same HDFS file at the same time.
(2) if the two stages run one by one, the tasks of the latter stage can read 
the cached blocks of the edges RDD directly. But it cannot achieve 
data-locality for the latter stage because that the block location information 
are not known when submiting the stages.


> Improve joinning RDDs that transformed from the same cached RDD
> ---------------------------------------------------------------
>
>                 Key: SPARK-2818
>                 URL: https://issues.apache.org/jira/browse/SPARK-2818
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>            Reporter: Lu Lu
>
> if the joinning RDDs are originating from a same cached RDD, the DAGScheduler 
> will submit redundant stages to compute and cache the common parent.
> For example:
> {code}
>     val edges = sc.textFile(...).cache()
>     val bigSrc = edges.groupByKey().filter(...)
>     val reversed = edges.map(edge => (edge._2, edge._1))
>     val bigDst = reversed.groupByKey().filter(...)
>     bigSrc.join(bigDst).count
> {code}
> The final count action will trigger two stages both to compute the edges RDD. 
> It will result to two performance problerm:
> (1) if the resources are sufficient, these two stages will be running 
> concurrently and read the same HDFS file at the same time.
> (2) if the two stages run one by one, the tasks of the latter stage can read 
> the cached blocks of the edges RDD directly. But it cannot achieve 
> data-locality for the latter stage because that the block location 
> information are not known when submiting the stages.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to