[
https://issues.apache.org/jira/browse/SPARK-2688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14289771#comment-14289771
]
Harry Brundage commented on SPARK-2688:
---------------------------------------
I respectfully disagree :)
Persist is one way to implement this functionality in the same way that
intermediate HDFS writes between map reduce jobs is the way to implement Hive
queries: it is the slow path, and isn't necessary. Even if your dataset fits in
memory, its still slower to cache and reuse than to just fly through HDFS read
-> spark job -> HDFS write as fast as possible, and if the data doesn't fit in
memory, you are now forced to pay a disk write penalty for
persist(MEMORY_AND_DISK) that isn't strictly necessary. From what I understand,
that's explicitly Spark's purpose: to avoid intermediate persist steps that
slow everything down needlessly when you can stream through memory or iterate
on data in memory all nice and fast like. I'd also like to note that Matei
posted about doing this once on the mailing list here:
http://apache-spark-user-list.1001560.n3.nabble.com/Is-deferred-execution-of-multiple-RDDs-ever-coming-td10310.html
I've spoken to Josh Rosen and Sandy about this both briefly, and from what I
understand, this is really hard. The main problem is rooted throughout the
whole project: RDDs pull data from their parent stages, as opposed to having
data pushed at them. If data was pushed, then the double-output thing would be
decidedly easier, but since it is pulled, the parent stage needs some
buffer/persistence mechanism to keep intermediate data around until all the
downstream dependents are ready to ask for it. I am not really sure how
changing the model from pull to push would affect the R part of RDD, but I
think its definitely a consideration.
To root this discussion in reality a bit more, I have an example use case I
think is valid. We very often materialize large volumes of JSON logs from raw
sources like a cloud based syslog sink, or aggregated nginx logs, or what have
you. In the vast majority of circumstances all the json lines parse just fine,
but a very small select few are broken. We don't want to tear down the whole
spark job because one line is invalid, but these things seem inevitable,
especially when dealing with stuff from the web as most Hadoop users are. So,
we rescue JSON parse errors, but we want to log that it happened somehow, and
hopefully leave the data around for manual inspections. Ralph Kimball calls
this a "reject bin". You stick the malformed stuff in a secondary location, and
ignore it if there is a really low volume, but always have it around to go back
to in the event you've been systematically ignoring something you shouldn't, or
the volume of rejects gets too high for you to have confident they really are
just randomly cosmic-ray broken. For us, we don't want to .persist the entire
mass of JSON data immediately lifting it up off the disk, as we are probably
going to filter it down or manipulate it very soon after, but unless we persist
it, we have no way to "fork" the RDD into the good-quality parsed JSON data for
the rest of the job, and the bad quality JSON data we want to write to some
other secondary reject location. Feel free to suggest a different persistence
mechanism for the rejects: map over the data and log it to some other service
or whatever, but I don't really like this solution. We need to now implement
some external state store when we have HDFS handy, we need to resolve
duplicates if a spark task dies for whatever reason (YARN preemption in our
case), and then re-executes and re-logs broken data, and now we have a whole
other operational component. For the time being, we're just logging broken JSON
in the executor logs, but these get reaped frequently and are far from user
friendly to query.
So, I suggest keeping this open as a ticket to evaluate if switching to pushing
data through the DAG is ok. If you folks don't want to do it that's totally
fine, but I would like to make sure that is the case.
> Need a way to run multiple data pipeline concurrently
> -----------------------------------------------------
>
> Key: SPARK-2688
> URL: https://issues.apache.org/jira/browse/SPARK-2688
> Project: Spark
> Issue Type: Improvement
> Components: Spark Core
> Affects Versions: 1.0.1
> Reporter: Xuefu Zhang
>
> Suppose we want to do the following data processing:
> {code}
> rdd1 -> rdd2 -> rdd3
> | -> rdd4
> | -> rdd5
> \ -> rdd6
> {code}
> where -> represents a transformation. rdd3 to rrdd6 are all derived from an
> intermediate rdd2. We use foreach(fn) with a dummy function to trigger the
> execution. However, rdd.foreach(fn) only trigger pipeline rdd1 -> rdd2 ->
> rdd3. To make things worse, when we call rdd4.foreach(), rdd2 will be
> recomputed. This is very inefficient. Ideally, we should be able to trigger
> the execution the whole graph and reuse rdd2, but there doesn't seem to be a
> way doing so. Tez already realized the importance of this (TEZ-391), so I
> think Spark should provide this too.
> This is required for Hive to support multi-insert queries. HIVE-7292.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]