[ 
https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15005677#comment-15005677
 ] 

Dmitriy Lyubimov commented on MAHOUT-1790:
------------------------------------------

+1. treeReduce() is a new api appeared in spark 1.3. The only complaint about 
this api i heard was "why they didn't do it in the first place". And they may 
have taken care of the optimization of the depth too. Semantically equivalent 
to reduce().

which means this fix is applicable to Mahout 0.11.x + branches. 

[~FlamingMike] in the spirit of the Apache "power of do", can you try to 
suggest the patch?

> SparkEngine nnz overflow resultSize when reducing.
> --------------------------------------------------
>
>                 Key: MAHOUT-1790
>                 URL: https://issues.apache.org/jira/browse/MAHOUT-1790
>             Project: Mahout
>          Issue Type: Bug
>          Components: spark
>    Affects Versions: 0.11.1
>            Reporter: Michel Lemay
>            Priority: Minor
>
> When counting numNonZeroElementsPerColumn in spark engine with large number 
> of columns, we get the following error:
> ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7 
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
> and then, the call stack:
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size 
> of serialized results of 267 tasks (1024.1 MB) is bigger than 
> spark.driver.maxResultSize (1024.0 MB)
>         at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
>         at scala.Option.foreach(Option.scala:236)
>         at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
>         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>         at 
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
>         at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
>         at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
>         at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
>         at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
>         at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
>         at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
>         at 
> org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
>         at 
> org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
>         at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
>         at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
>         at 
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
> This occurs because it uses a DenseVector and spark seemingly aggregate all 
> of them on the driver before reducing.  
> I think this could be easily prevented with a treeReduce(_ += _, depth)  
> instead of a reduce(_ += _)
> 'depth' could be computed in function of 'n' and numberOfPartitions.. 
> something in the line of:
>   val maxResultSize = ....
>   val numPartitions = drm.rdd.partitions.size
>   val n = drm.ncol
>   val bytesPerVector = n * 8 + overhead?
>   val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
>   val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors) 
> / math.log(2)).toInt)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to