[
https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14291229#comment-14291229
]
Imran Rashid commented on SPARK-2688:
-------------------------------------
I think there are a few different issues here, but they're not cleanly
separated into this issue and SPARK-3622.
1. How do we create multiple RDDs from one RDD?
2. What is the efficient way to compute a forked DAG like this, without
requiring you to recompute anything or persist?
3. Can spark support some new API (aka a "push model") that would allow this to
be done more efficiently?
(1) is just an api question. Obviously, its possible to create multiple RDDs
from one RDD, you can just go over the input data multiple times. and you can
have a util function do it for you, like {{rdd.randomSplit}}. But should we
make an api that is more convenient for this, eg. something like
{code}
def filterMultiple(f: PartialFunction[T,K]): Map[K,RDD[T]]
{code}
eg. you'd use it something like
{code}
val filteredRdd:Map[String,RDD[Int]] = rdd.filterMultiple{
case x if x > 1000 => "big"
case x if x > 0 && x % 2 == 0 => "small even"
case x if x > 0 => "small even" => "small odd"
//ignore negative x
}
val bigXs = filteredRdd("big")
...
{code}
and then I suppose you'd need at least the {{mapPartitionsWithIndex}} variant
as well. We could certainly add an API like that, but I think it would only
mislead the user into expecting things to be happening concurrently, when
they're not really. (In fact, I even worry about this with {{randomSplit}}.
If the input RDD isn't cached, it'll get recomputed. Even if it is, the
sampler gets executed over and over again.)
(2) & (3) are a mix of api & efficiency. For (2), you don't have to recompute
nor hit disk ... but that's assuming you've got the memory to cache the result
in memory. Its easy to imagine cases where you don't want to recompute or use
an disk or memory for the immediate, eg.
{code}
100 TB rdd 1 -->100 TB rdd 2--> 50 TB rdd 3 --> save to hdfs path A
\---> 50 TB rdd 4 --> save to hdfs path B
{code}
right now I think the answer to (2) is you have to either recompute the
intermediate, store it on disk, or store in memory.
For (3), I think to really support this properly we'd need to a rather
different api. I agree with you [~sowen] "it seems like this only works if the
N output RDDs are persisted, since you have to immediately save the results" --
but I think its a bit more general than that, you just need some action queued
up on each output RDD. This is what I was suggesting with my {{MultiRDD}}
comment on SPARK-3622
> Need a way to run multiple data pipeline concurrently
> -----------------------------------------------------
>
> Key: SPARK-2688
> URL: https://issues.apache.org/jira/browse/SPARK-2688
> Project: Spark
> Issue Type: New Feature
> Components: Spark Core
> Affects Versions: 1.0.1
> Reporter: Xuefu Zhang
>
> Suppose we want to do the following data processing:
> {code}
> rdd1 -> rdd2 -> rdd3
> | -> rdd4
> | -> rdd5
> \ -> rdd6
> {code}
> where -> represents a transformation. rdd3 to rrdd6 are all derived from an
> intermediate rdd2. We use foreach(fn) with a dummy function to trigger the
> execution. However, rdd.foreach(fn) only trigger pipeline rdd1 -> rdd2 ->
> rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be
> recomputed. This is very inefficient. Ideally, we should be able to trigger
> the execution the whole graph and reuse rdd2, but there doesn't seem to be a
> way doing so. Tez already realized the importance of this (TEZ-391), so I
> think Spark should provide this too.
> This is required for Hive to support multi-insert queries. HIVE-7292.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]