[
https://issues.apache.org/jira/browse/MAHOUT-1790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15761140#comment-15761140
]
Michel Lemay commented on MAHOUT-1790:
--------------------------------------
Right now, we are not using Mahout in production code. We found that matrix
operations in distributed clusters with spark tends to be way to slow and
prefer to use shared memory approach with bigger machines.
> SparkEngine nnz overflow resultSize when reducing.
> --------------------------------------------------
>
> Key: MAHOUT-1790
> URL: https://issues.apache.org/jira/browse/MAHOUT-1790
> Project: Mahout
> Issue Type: Bug
> Components: spark
> Affects Versions: 0.11.1
> Reporter: Michel Lemay
> Assignee: Dmitriy Lyubimov
> Priority: Minor
> Fix For: 0.13.0
>
>
> When counting numNonZeroElementsPerColumn in spark engine with large number
> of columns, we get the following error:
> ERROR TaskSetManager: Total size of serialized results of nnn tasks (1031.7
> MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
> and then, the call stack:
> org.apache.spark.SparkException: Job aborted due to stage failure: Total size
> of serialized results of 267 tasks (1024.1 MB) is bigger than
> spark.driver.maxResultSize (1024.0 MB)
> at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1942)
> at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1003)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
> at org.apache.spark.rdd.RDD.reduce(RDD.scala:985)
> at
> org.apache.mahout.sparkbindings.SparkEngine$.numNonZeroElementsPerColumn(SparkEngine.scala:86)
> at
> org.apache.mahout.math.drm.CheckpointedOps.numNonZeroElementsPerColumn(CheckpointedOps.scala:37)
> at
> org.apache.mahout.math.cf.SimilarityAnalysis$.sampleDownAndBinarize(SimilarityAnalysis.scala:286)
> at
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrences(SimilarityAnalysis.scala:66)
> at
> org.apache.mahout.math.cf.SimilarityAnalysis$.cooccurrencesIDSs(SimilarityAnalysis.scala:141)
> This occurs because it uses a DenseVector and spark seemingly aggregate all
> of them on the driver before reducing.
> I think this could be easily prevented with a treeReduce(_ += _, depth)
> instead of a reduce(_ += _)
> 'depth' could be computed in function of 'n' and numberOfPartitions..
> something in the line of:
> val maxResultSize = ....
> val numPartitions = drm.rdd.partitions.size
> val n = drm.ncol
> val bytesPerVector = n * 8 + overhead?
> val maxVectors = maxResultSize / bytes / 2 + 1 // be safe
> val depth = math.max(1, math.ceil(math.log(1 + numPartitions / maxVectors)
> / math.log(2)).toInt)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)