[ 
https://issues.apache.org/jira/browse/CRUNCH-510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14485898#comment-14485898
 ] 

Josh Wills commented on CRUNCH-510:
-----------------------------------

I wrote a collect-based materialization scheme for Spark-Dataflow:

https://github.com/cloudera/spark-dataflow/blob/master/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java

...but you're right, I was lazy when I did SparkPipeline and re-used the 
MRPipeline materialization logic. The main reason was actually mapside joins, 
which (right now) rely on a materialized file existing in HDFS that can then be 
read into the distributed cache and passed out to the subsequent job runs. So 
for that logic to still work, we would need the SparkPipeline.materialize to be 
sort of clever and decide how/where to materialize the data based on the client 
context.

> PCollection.materialize with Spark should use collect()
> -------------------------------------------------------
>
>                 Key: CRUNCH-510
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-510
>             Project: Crunch
>          Issue Type: Improvement
>          Components: Core
>            Reporter: Micah Whitacre
>            Assignee: Josh Wills
>
> When troubleshooting some other code noticed that when using the 
> SparkPipeline and the code forces a materialize() to be called...
> {code}
>       delta = Aggregate.max(scores.parallelDo(new MapFn<Pair<String, 
> PageRankData>, Float>() {
>         @Override
>         public Float map(Pair<String, PageRankData> input) {
>           PageRankData prd = input.second();
>           return Math.abs(prd.score - prd.lastScore);
>         }
>       }, ptf.floats())).getValue();
> {code}
> That the underlying code actually results in writing out the value to HDFS:
> {noformat}
> 15/04/08 13:59:33 INFO DAGScheduler: Job 1 finished: saveAsNewAPIHadoopFile 
> at SparkRuntime.java:332, took 0.223622 s
> {noformat}
> Since Spark has the method collect() on RDDs, that should accomplish a 
> similar bit of functionality, I wonder if we could switch to use that and cut 
> down on the need to persist it to HDFS.  I think this is currently happening 
> because of sharing logic between MRPipeline and SparkPipeline and have no 
> context about how we could possibly break it apart easily.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to