60m-vector costs 480MB memory. You have 12 of them to be reduced to the driver. So you need ~6GB memory not counting the temp vectors generated from '_+_'. You need to increase driver memory to make it work. That being said, ~10^7 hits the limit for the current impl of glm. -Xiangrui On Jan 23, 2015 2:19 PM, "DB Tsai" <dbt...@dbtsai.com> wrote:
> Hi Alexander, > > For `reduce`, it's an action that will collect all the data from > mapper to driver, and perform the aggregation in driver. As a result, > if the output from the mapper is very large, and the numbers of > partitions in mapper are large, it might cause a problem. > > For `treeReduce`, as the name indicates, the way it works is in the > first layer, it aggregates the output of the mappers two by two > resulting half of the numbers of output. And then, we continuously do > the aggregation layer by layer. The final aggregation will be done in > driver but in this time, the numbers of data are small. > > By default, depth 2 is used, so if you have so many partitions of > large vector, this may still cause issue. You can increase the depth > into higher numbers such that in the final reduce in driver, the > number of partitions are very small. > > Sincerely, > > DB Tsai > ------------------------------------------------------- > Blog: https://www.dbtsai.com > LinkedIn: https://www.linkedin.com/in/dbtsai > > > > On Fri, Jan 23, 2015 at 12:07 PM, Ulanov, Alexander > <alexander.ula...@hp.com> wrote: > > Hi DB Tsai, > > > > Thank you for your suggestion. Actually, I've started my experiments > with "treeReduce". Originally, I had "vv.treeReduce(_ + _, 2)" in my script > exactly because MLlib optimizers are using it, as you pointed out with > LBFGS. However, it leads to the same problems as "reduce", but presumably > not so directly. As far as I understand, treeReduce limits the number of > communications between workers and master forcing workers to partially > compute the reduce operation. > > > > Are you sure that driver will first collect all results (or all partial > results in treeReduce) and ONLY then perform aggregation? If that is the > problem, then how to force it to do aggregation after receiving each > portion of data from Workers? > > > > Best regards, Alexander > > > > -----Original Message----- > > From: DB Tsai [mailto:dbt...@dbtsai.com] > > Sent: Friday, January 23, 2015 11:53 AM > > To: Ulanov, Alexander > > Cc: dev@spark.apache.org > > Subject: Re: Maximum size of vector that reduce can handle > > > > Hi Alexander, > > > > When you use `reduce` to aggregate the vectors, those will actually be > pulled into driver, and merged over there. Obviously, it's not scaleable > given you are doing deep neural networks which have so many coefficients. > > > > Please try treeReduce instead which is what we do in linear regression > and logistic regression. > > > > See > https://github.com/apache/spark/blob/branch-1.1/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala > > for example. > > > > val (gradientSum, lossSum) = data.treeAggregate((Vectors.zeros(n), > 0.0))( seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, > features)) => val l = localGradient.compute( features, label, bcW.value, > grad) (grad, loss + l) }, combOp = (c1, c2) => (c1, c2) match { case > ((grad1, loss1), (grad2, loss2)) => axpy(1.0, grad2, grad1) (grad1, loss1 + > loss2) > > }) > > > > Sincerely, > > > > DB Tsai > > ------------------------------------------------------- > > Blog: https://www.dbtsai.com > > LinkedIn: https://www.linkedin.com/in/dbtsai > > > > > > > > On Fri, Jan 23, 2015 at 10:00 AM, Ulanov, Alexander < > alexander.ula...@hp.com> wrote: > >> Dear Spark developers, > >> > >> I am trying to measure the Spark reduce performance for big vectors. My > motivation is related to machine learning gradient. Gradient is a vector > that is computed on each worker and then all results need to be summed up > and broadcasted back to workers. For example, present machine learning > applications involve very long parameter vectors, for deep neural networks > it can be up to 2Billions. So, I want to measure the time that is needed > for this operation depending on the size of vector and number of workers. I > wrote few lines of code that assume that Spark will distribute partitions > among all available workers. I have 6-machine cluster (Xeon 3.3GHz 4 cores, > 16GB RAM), each runs 2 Workers. > >> > >> import org.apache.spark.mllib.rdd.RDDFunctions._ > >> import breeze.linalg._ > >> import org.apache.log4j._ > >> Logger.getRootLogger.setLevel(Level.OFF) > >> val n = 60000000 > >> val p = 12 > >> val vv = sc.parallelize(0 until p, p).map(i => > >> DenseVector.rand[Double]( n )) vv.reduce(_ + _) > >> > >> When executing in shell with 60M vector it crashes after some period of > time. One of the node contains the following in stdout: > >> Java HotSpot(TM) 64-Bit Server VM warning: INFO: > >> os::commit_memory(0x0000000755500000, 2863661056, 0) failed; > >> error='Cannot allocate memory' (errno=12) # # There is insufficient > memory for the Java Runtime Environment to continue. > >> # Native memory allocation (malloc) failed to allocate 2863661056 bytes > for committing reserved memory. > >> > >> I run shell with --executor-memory 8G --driver-memory 8G, so handling > 60M vector of Double should not be a problem. Are there any big overheads > for this? What is the maximum size of vector that reduce can handle? > >> > >> Best regards, Alexander > >> > >> P.S. > >> > >> "spark.driver.maxResultSize 0" needs to set in order to run this code. > I also needed to change "java.io.tmpdir" and "spark.local.dir" folders > because my /tmp folder which is default, was too small and Spark swaps > heavily into this folder. Without these settings I get either "no space > left on device" or "out of memory" exceptions. > >> > >> I also submitted a bug > >> https://issues.apache.org/jira/browse/SPARK-5386 > >> > >> --------------------------------------------------------------------- > >> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For > >> additional commands, e-mail: dev-h...@spark.apache.org > >> > > --------------------------------------------------------------------- > To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > For additional commands, e-mail: dev-h...@spark.apache.org > >